Skip to content

Commit ada71b6

Browse files
authored
feat!: add catalog-managed table creation utilities (delta-io#2203)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/2203/files/8cf518a944cd62f87f35cad6db205195b224c513..9c6826254bddecce6b2fe428aa9a1b6a1af1b6d2) to review incremental changes. - [stack/kernel-catalog-managed-create](delta-io#2293) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/2293/files)] - [**stack/ccv2-create**](delta-io#2203) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/2203/files/8cf518a944cd62f87f35cad6db205195b224c513..9c6826254bddecce6b2fe428aa9a1b6a1af1b6d2)] - [stack/ccv2-create-pt2](delta-io#2247) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/2247/files/9c6826254bddecce6b2fe428aa9a1b6a1af1b6d2..6847bd06fd174c05674b34e4970671d5cf073d5a)] - [stack/create-table-utils-pt3](delta-io#2250) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/2250/files/6847bd06fd174c05674b34e4970671d5cf073d5a..e1035044d9e0c419eaee5d4ebb1284adb8646bf0)] - [stack/create-table-utils-pt4](delta-io#2254) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/2254/files/e1035044d9e0c419eaee5d4ebb1284adb8646bf0..ba618b1a85366699b2312ea3430d6adec8aefc4d)] --------- ## What changes are proposed in this pull request? Adds `create_utils` module to `delta-kernel-unity-catalog` with two public functions for building the required table properties during UC catalog-managed table creation: 1. `get_required_properties_for_disk(uc_table_id)` -- returns properties that must be written to disk in `000.json` (catalogManaged, vacuumProtocolCheck, tableId, ICT) 2. `get_final_required_properties_for_uc(snapshot, engine)` -- extracts post-commit properties to send to UC (protocol versions, feature signals, metadata config, clustering columns, ICT timestamp 3. Also adds public getters on `Snapshot` for protocol/metadata inspection: `min_reader_version`, `min_writer_version`, `reader_features`, `writer_features`, `metadata_configuration`, and `get_clustering_columns`. Other changes: - Adds `physical_to_logical_column_name` to `column_mapping.rs` for converting physical clustering column names back to logical names - Adds `CatalogManaged` to `ALLOWED_DELTA_FEATURES` in `create_table` builder - Extracts shared UC constants to `constants.rs` module ## How was this change tested? 1. `test_get_required_properties_for_disk` - verifies all 4 disk properties 2. `test_get_final_required_properties_for_uc` - round-trip test: creates table, loads snapshot, extracts UC properties, verifies protocol versions, feature signals, metadata config, ICT timestamp, and version 3. `test_get_final_required_properties_for_uc_with_clustering` - same with clustering columns, verifies JSON serialization 4. `test_public_protocol_getters` - tests Snapshot protocol getters against fixture 5. `test_metadata_configuration` - tests Snapshot metadata config getter 6. `test_catalog_managed_feature_signal_accepted` - verifies CatalogManaged in ALLOWED_DELTA_FEATURES
1 parent 743dc2f commit ada71b6

10 files changed

Lines changed: 646 additions & 6 deletions

File tree

delta-kernel-unity-catalog/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ arrow-57 = ["delta_kernel/arrow-57"]
2020
arrow-56 = ["delta_kernel/arrow-56"]
2121

2222
[dependencies]
23-
delta_kernel = { path = "../kernel", features = ["catalog-managed"] }
23+
delta_kernel = { path = "../kernel", features = ["catalog-managed", "internal-api"] }
2424
unity-catalog-delta-client-api = { path = "../unity-catalog-delta-client-api" }
2525
itertools = "0.14"
2626
serde = { version = "1.0", features = ["derive"] }
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
//! Shared constants for UC catalog-managed table operations.
2+
3+
/// Property key for the UC table ID, stored in Delta metadata configuration.
4+
pub(crate) const UC_TABLE_ID_KEY: &str = "io.unitycatalog.tableId";
5+
/// Feature supported value.
6+
pub(crate) const FEATURE_SUPPORTED: &str = "supported";
7+
/// Feature signal key for catalog-managed tables.
8+
pub(crate) const CATALOG_MANAGED_FEATURE_KEY: &str = "delta.feature.catalogManaged";
9+
/// Feature signal key for vacuum protocol check.
10+
pub(crate) const VACUUM_PROTOCOL_CHECK_FEATURE_KEY: &str = "delta.feature.vacuumProtocolCheck";
11+
/// UC property for the last committed version.
12+
pub(crate) const METASTORE_LAST_UPDATE_VERSION: &str = "delta.lastUpdateVersion";
13+
/// UC property for the last commit timestamp.
14+
pub(crate) const METASTORE_LAST_COMMIT_TIMESTAMP: &str = "delta.lastCommitTimestamp";

delta-kernel-unity-catalog/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
//! UCKernelClient implements a high-level interface for interacting with Delta Tables in Unity Catalog.
22
33
mod committer;
4+
mod constants;
5+
mod utils;
46
pub use committer::UCCommitter;
7+
pub use utils::{get_final_required_properties_for_uc, get_required_properties_for_disk};
58

69
use std::sync::Arc;
710

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
//! Utilities for Unity Catalog catalog-managed table creation.
2+
//!
3+
//! These utilities help connectors create UC-managed tables by providing the required properties
4+
//! for both the Delta log (disk) and the UC server registration.
5+
//!
6+
//! # Usage
7+
//!
8+
//! ```ignore
9+
//! // Step 1: Get staging info from UC
10+
//! let staging_info = my_uc_client.get_staging_table(..);
11+
//!
12+
//! // Step 2: Build and commit the create-table transaction
13+
//! let disk_props = get_required_properties_for_disk(staging_info.table_id);
14+
//! let create_table_txn = kernel::create_table(path, schema, "MyApp/1.0")
15+
//! .with_table_properties(disk_props)
16+
//! .build(engine, committer);
17+
//! let result = create_table_txn.commit(engine);
18+
//!
19+
//! // Step 3: Finalize table in UC
20+
//! let snapshot = /* load post-commit snapshot at version 0 */;
21+
//! let uc_props = get_final_required_properties_for_uc(&snapshot, engine)?;
22+
//! my_uc_client.create_table(.., uc_props);
23+
//! ```
24+
25+
use std::collections::HashMap;
26+
27+
use delta_kernel::{Engine, Snapshot};
28+
29+
use crate::constants::{
30+
CATALOG_MANAGED_FEATURE_KEY, FEATURE_SUPPORTED, METASTORE_LAST_COMMIT_TIMESTAMP,
31+
METASTORE_LAST_UPDATE_VERSION, UC_TABLE_ID_KEY, VACUUM_PROTOCOL_CHECK_FEATURE_KEY,
32+
};
33+
34+
/// Returns the table properties that must be written to disk (in `000.json`) for a UC
35+
/// catalog-managed table creation.
36+
///
37+
/// These properties must be persisted in the Delta log so that the table is recognized as
38+
/// catalog-managed. Note: ICT enablement is handled automatically by kernel's CREATE TABLE
39+
/// when the `catalogManaged` feature is present.
40+
pub fn get_required_properties_for_disk(uc_table_id: &str) -> HashMap<String, String> {
41+
[
42+
(CATALOG_MANAGED_FEATURE_KEY, FEATURE_SUPPORTED),
43+
(VACUUM_PROTOCOL_CHECK_FEATURE_KEY, FEATURE_SUPPORTED),
44+
(UC_TABLE_ID_KEY, uc_table_id),
45+
]
46+
.into_iter()
47+
.map(|(k, v)| (k.to_string(), v.to_string()))
48+
.collect()
49+
}
50+
51+
/// Extracts the properties that must be sent to the UC server when finalizing a table creation.
52+
///
53+
/// These properties are derived from the post-commit snapshot (after `000.json` has
54+
/// been written). The connector should pass these to the UC `create_table` API.
55+
///
56+
/// # Properties returned
57+
///
58+
/// - All entries from `Metadata.configuration` (includes `io.unitycatalog.tableId`, user props)
59+
/// - `delta.minReaderVersion` and `delta.minWriterVersion`
60+
/// - `delta.feature.<name> = "supported"` for every reader and writer table feature
61+
/// - `delta.lastUpdateVersion` -- the snapshot version
62+
/// - `delta.lastCommitTimestamp` -- the snapshot's in-commit timestamp (requires ICT enabled)
63+
/// - `clusteringColumns` -- JSON-serialized clustering columns (if clustering is enabled)
64+
///
65+
/// # Clustering columns
66+
///
67+
/// Clustering columns are returned as logical column names. When column mapping is enabled,
68+
/// the physical names stored in domain metadata are converted to logical names using the
69+
/// table schema.
70+
pub fn get_final_required_properties_for_uc(
71+
snapshot: &Snapshot,
72+
engine: &dyn Engine,
73+
) -> delta_kernel::DeltaResult<HashMap<String, String>> {
74+
if snapshot.version() != 0 {
75+
return Err(delta_kernel::Error::generic(format!(
76+
"get_final_required_properties_for_uc is only valid for version 0 (table creation) \
77+
snapshots, but snapshot is at version {}",
78+
snapshot.version()
79+
)));
80+
}
81+
82+
// Start with metadata configuration (user + delta properties)
83+
let mut properties = snapshot.metadata_configuration().clone();
84+
85+
// Protocol-derived properties (versions + feature signals)
86+
properties.extend(snapshot.get_protocol_derived_properties());
87+
88+
// UC-specific properties
89+
properties.insert(
90+
METASTORE_LAST_UPDATE_VERSION.to_string(),
91+
snapshot.version().to_string(),
92+
);
93+
let timestamp = snapshot.get_in_commit_timestamp(engine)?.ok_or_else(|| {
94+
delta_kernel::Error::generic(
95+
"In-commit timestamp is required for UC catalog-managed tables but was not found",
96+
)
97+
})?;
98+
properties.insert(
99+
METASTORE_LAST_COMMIT_TIMESTAMP.to_string(),
100+
timestamp.to_string(),
101+
);
102+
103+
// Clustering columns as logical names (if present)
104+
if let Some(columns) = snapshot.get_logical_clustering_columns(engine)? {
105+
let column_arrays: Vec<Vec<&str>> = columns
106+
.iter()
107+
.map(|c| c.path().iter().map(|s| s.as_str()).collect())
108+
.collect();
109+
let json = serde_json::to_string(&column_arrays).map_err(|e| {
110+
delta_kernel::Error::generic(format!("Failed to serialize clustering columns: {e}"))
111+
})?;
112+
properties.insert("clusteringColumns".to_string(), json);
113+
}
114+
115+
Ok(properties)
116+
}
117+
118+
#[cfg(test)]
119+
mod tests {
120+
use super::*;
121+
122+
use std::sync::Arc;
123+
124+
use delta_kernel::committer::{CommitMetadata, CommitResponse, Committer, PublishMetadata};
125+
use delta_kernel::engine::default::DefaultEngineBuilder;
126+
use delta_kernel::object_store::memory::InMemory;
127+
use delta_kernel::schema::{DataType, StructField, StructType};
128+
use delta_kernel::snapshot::Snapshot;
129+
use delta_kernel::transaction::create_table::create_table;
130+
use delta_kernel::transaction::data_layout::DataLayout;
131+
use delta_kernel::{DeltaResult, Engine, FileMeta, FilteredEngineData};
132+
133+
/// A mock catalog committer that writes directly to the published path.
134+
struct MockCatalogCommitter;
135+
impl Committer for MockCatalogCommitter {
136+
fn commit(
137+
&self,
138+
engine: &dyn Engine,
139+
actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
140+
commit_metadata: CommitMetadata,
141+
) -> DeltaResult<CommitResponse> {
142+
let path = commit_metadata.published_commit_path()?;
143+
engine
144+
.json_handler()
145+
.write_json_file(&path, Box::new(actions), false)?;
146+
Ok(CommitResponse::Committed {
147+
file_meta: FileMeta::new(path, commit_metadata.in_commit_timestamp(), 0),
148+
})
149+
}
150+
fn is_catalog_committer(&self) -> bool {
151+
true
152+
}
153+
fn publish(&self, _: &dyn Engine, _: PublishMetadata) -> DeltaResult<()> {
154+
Ok(())
155+
}
156+
}
157+
158+
#[test]
159+
fn test_get_required_properties_for_disk() {
160+
let props = get_required_properties_for_disk("my-uc-table-123");
161+
assert_eq!(props.len(), 3);
162+
assert_eq!(props["delta.feature.catalogManaged"], "supported");
163+
assert_eq!(props["delta.feature.vacuumProtocolCheck"], "supported");
164+
assert_eq!(props["io.unitycatalog.tableId"], "my-uc-table-123");
165+
}
166+
167+
#[tokio::test]
168+
async fn test_get_final_required_properties_for_uc() {
169+
let storage = Arc::new(InMemory::new());
170+
let engine = DefaultEngineBuilder::new(storage).build();
171+
let table_path = "memory:///test_table/";
172+
let schema = Arc::new(
173+
StructType::try_new(vec![
174+
StructField::new("id", DataType::INTEGER, false),
175+
StructField::new("region", DataType::STRING, true),
176+
])
177+
.unwrap(),
178+
);
179+
180+
// Create a UC catalog-managed table with clustering
181+
let disk_props = get_required_properties_for_disk("test-table-id-456");
182+
let _ = create_table(table_path, schema, "Test/1.0")
183+
.with_table_properties(disk_props)
184+
.with_data_layout(DataLayout::clustered(["region"]))
185+
.build(&engine, Box::new(MockCatalogCommitter))
186+
.unwrap()
187+
.commit(&engine)
188+
.unwrap();
189+
190+
let snapshot = Snapshot::builder_for(table_path).build(&engine).unwrap();
191+
assert_eq!(snapshot.version(), 0);
192+
let uc_props = get_final_required_properties_for_uc(&snapshot, &engine).unwrap();
193+
194+
// Protocol-derived properties
195+
assert_eq!(uc_props["delta.minReaderVersion"], "3");
196+
assert_eq!(uc_props["delta.minWriterVersion"], "7");
197+
assert_eq!(uc_props["delta.feature.catalogManaged"], "supported");
198+
assert_eq!(uc_props["delta.feature.vacuumProtocolCheck"], "supported");
199+
assert_eq!(uc_props["delta.feature.inCommitTimestamp"], "supported");
200+
assert_eq!(uc_props["delta.feature.clustering"], "supported");
201+
202+
// Metadata configuration
203+
assert_eq!(uc_props["io.unitycatalog.tableId"], "test-table-id-456");
204+
205+
// UC-specific properties
206+
assert_eq!(uc_props["delta.lastUpdateVersion"], "0");
207+
let timestamp: i64 = uc_props["delta.lastCommitTimestamp"]
208+
.parse()
209+
.expect("timestamp should be a valid i64");
210+
assert!(
211+
timestamp > 0,
212+
"ICT timestamp should be non-zero, got {timestamp}"
213+
);
214+
215+
// Clustering columns: serialized as [[col1], [col2]] (array of path arrays)
216+
let parsed: Vec<Vec<String>> =
217+
serde_json::from_str(&uc_props["clusteringColumns"]).unwrap();
218+
assert_eq!(parsed, vec![vec!["region"]]);
219+
}
220+
221+
#[tokio::test]
222+
async fn test_clustering_columns_serialization_multiple_and_nested() {
223+
let storage = Arc::new(InMemory::new());
224+
let engine = DefaultEngineBuilder::new(storage).build();
225+
let table_path = "memory:///test_clustering_ser/";
226+
let address_struct = StructType::new_unchecked(vec![
227+
StructField::new("city", DataType::STRING, true),
228+
StructField::new("zip", DataType::STRING, true),
229+
]);
230+
let schema = Arc::new(
231+
StructType::try_new(vec![
232+
StructField::new("id", DataType::INTEGER, false),
233+
StructField::new("region", DataType::STRING, true),
234+
StructField::new("address", DataType::Struct(Box::new(address_struct)), true),
235+
])
236+
.unwrap(),
237+
);
238+
239+
use delta_kernel::expressions::ColumnName;
240+
241+
let disk_props = get_required_properties_for_disk("test-table-id");
242+
let _ = create_table(table_path, schema, "Test/1.0")
243+
.with_table_properties(disk_props)
244+
.with_data_layout(DataLayout::Clustered {
245+
columns: vec![
246+
ColumnName::new(["region"]),
247+
ColumnName::new(["address", "city"]),
248+
],
249+
})
250+
.build(&engine, Box::new(MockCatalogCommitter))
251+
.unwrap()
252+
.commit(&engine)
253+
.unwrap();
254+
255+
let snapshot = Snapshot::builder_for(table_path).build(&engine).unwrap();
256+
let uc_props = get_final_required_properties_for_uc(&snapshot, &engine).unwrap();
257+
258+
// Clustering columns serialized as array of path arrays:
259+
// [["region"], ["address", "city"]]
260+
let raw_json = &uc_props["clusteringColumns"];
261+
let parsed: Vec<Vec<String>> = serde_json::from_str(raw_json).unwrap();
262+
assert_eq!(
263+
parsed,
264+
vec![vec!["region"], vec!["address", "city"]],
265+
"Raw JSON: {raw_json}"
266+
);
267+
}
268+
269+
#[tokio::test]
270+
async fn test_get_final_required_properties_for_uc_rejects_non_zero_version() {
271+
let storage = Arc::new(InMemory::new());
272+
let engine = DefaultEngineBuilder::new(storage).build();
273+
let table_path = "memory:///test_version_check/";
274+
let schema = Arc::new(
275+
StructType::try_new(vec![StructField::new("id", DataType::INTEGER, false)]).unwrap(),
276+
);
277+
278+
// Create a table (version 0) and append (version 1)
279+
let disk_props = get_required_properties_for_disk("test-table-id");
280+
let _ = create_table(table_path, schema, "Test/1.0")
281+
.with_table_properties(disk_props)
282+
.build(&engine, Box::new(MockCatalogCommitter))
283+
.unwrap()
284+
.commit(&engine)
285+
.unwrap();
286+
let v0_snapshot = Snapshot::builder_for(table_path).build(&engine).unwrap();
287+
let result = v0_snapshot
288+
.transaction(Box::new(MockCatalogCommitter), &engine)
289+
.unwrap()
290+
.commit(&engine)
291+
.unwrap();
292+
assert!(result.is_committed());
293+
294+
// Load snapshot at version 1
295+
let snapshot = Snapshot::builder_for(table_path).build(&engine).unwrap();
296+
assert_eq!(snapshot.version(), 1);
297+
298+
// Should fail because version != 0
299+
let err = get_final_required_properties_for_uc(&snapshot, &engine).unwrap_err();
300+
assert!(
301+
err.to_string().contains("version 0"),
302+
"expected version 0 error, got: {err}"
303+
);
304+
}
305+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(crate) mod create_table;
2+
3+
pub use create_table::{get_final_required_properties_for_uc, get_required_properties_for_disk};

kernel/src/schema/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,14 +746,28 @@ impl StructType {
746746
&'a self,
747747
col: &ColumnName,
748748
) -> DeltaResult<Vec<&'a StructField>> {
749+
self.walk_column_fields_by(col, |s, name| s.field(name))
750+
}
751+
752+
/// Helper to walk through nested columns. For each path component in `col`, calls
753+
/// `find_field(current_struct, component)` to locate the matching field, then descends
754+
/// into the next nested struct. Returns references to all [`StructField`]s along the path.
755+
pub(crate) fn walk_column_fields_by<'a, F>(
756+
&'a self,
757+
col: &ColumnName,
758+
find_field: F,
759+
) -> DeltaResult<Vec<&'a StructField>>
760+
where
761+
F: for<'b> Fn(&'b StructType, &str) -> Option<&'b StructField>,
762+
{
749763
let path = col.path();
750764
if path.is_empty() {
751765
return Err(Error::generic("Column path cannot be empty"));
752766
}
753767
let mut current_struct = self;
754768
let mut fields = Vec::with_capacity(path.len());
755769
for (i, field_name) in path.iter().enumerate() {
756-
let field = current_struct.field(field_name).ok_or_else(|| {
770+
let field = find_field(current_struct, field_name).ok_or_else(|| {
757771
Error::generic(format!(
758772
"Could not resolve column '{col}': field '{field_name}' not found in schema"
759773
))

0 commit comments

Comments
 (0)