Skip to content

Commit 8a3bed2

Browse files
authored
feat: add FFI function to expose snapshot's timestamp (delta-io#2274)
## What changes are proposed in this pull request? In delta-io#2266 , the `Snapshot::get_timestamp` was added, providing the capability to read the timestamp of a Snapshot. This PR added an FFI function `snapshot_timestamp`, allowing other language to use the `Snapshot::get_timestamp` method. ## How was this change tested? New unit test was added.
1 parent d309338 commit 8a3bed2

2 files changed

Lines changed: 117 additions & 2 deletions

File tree

ffi/src/lib.rs

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,30 @@ pub unsafe extern "C" fn version(snapshot: Handle<SharedSnapshot>) -> u64 {
10051005
snapshot.version()
10061006
}
10071007

1008+
/// Get the timestamp of the specified snapshot in milliseconds since the Unix epoch.
1009+
///
1010+
/// When In-Commit Timestamp (ICT) is enabled, returns the ICT value from the commit's
1011+
/// `CommitInfo` action. Otherwise, falls back to the filesystem last-modified time of
1012+
/// the latest commit file.
1013+
///
1014+
/// Returns an error if the commit file is missing, the ICT configuration is invalid, or the
1015+
/// ICT value cannot be read.
1016+
///
1017+
/// # Safety
1018+
///
1019+
/// Caller is responsible for passing valid snapshot handle and engine handle.
1020+
#[no_mangle]
1021+
pub unsafe extern "C" fn snapshot_timestamp(
1022+
snapshot: Handle<SharedSnapshot>,
1023+
engine: Handle<SharedExternEngine>,
1024+
) -> ExternResult<i64> {
1025+
let engine_ref = unsafe { engine.as_ref() };
1026+
let snapshot = unsafe { snapshot.as_ref() };
1027+
snapshot
1028+
.get_timestamp(engine_ref.engine().as_ref())
1029+
.into_extern_result(&engine_ref)
1030+
}
1031+
10081032
/// Get the logical schema of the specified snapshot
10091033
///
10101034
/// # Safety
@@ -1205,15 +1229,17 @@ mod tests {
12051229
use delta_kernel::object_store::memory::InMemory;
12061230
use delta_kernel::object_store::path::Path;
12071231
use delta_kernel::object_store::ObjectStore;
1232+
use delta_kernel::schema::StructType;
12081233
use rstest::rstest;
12091234
use serde_json::Value;
12101235
use std::collections::HashMap;
12111236
#[cfg(feature = "catalog-managed")]
12121237
use test_utils::add_staged_commit;
12131238
use test_utils::{
12141239
actions_to_string, actions_to_string_partitioned, actions_to_string_with_metadata,
1215-
add_commit, TestAction, METADATA, METADATA_WITH_TABLE_PROPERTIES,
1240+
add_commit, create_table, TestAction, METADATA, METADATA_WITH_TABLE_PROPERTIES,
12161241
};
1242+
use url::Url;
12171243

12181244
#[no_mangle]
12191245
extern "C" fn allocate_null_err(_: KernelError, _: KernelStringSlice) -> *mut EngineError {
@@ -1317,6 +1343,95 @@ mod tests {
13171343
Ok(())
13181344
}
13191345

1346+
// TODO: (PR #2307) will introduce a helper function for setting up storage, engine.
1347+
// The test will need to refactor to use the helper function.
1348+
#[tokio::test]
1349+
async fn test_snapshot_timestamp_no_ict() -> Result<(), Box<dyn std::error::Error>> {
1350+
let storage = Arc::new(InMemory::new());
1351+
let table_root = "memory:///test_table/";
1352+
add_commit(
1353+
table_root,
1354+
storage.as_ref(),
1355+
0,
1356+
actions_to_string(vec![TestAction::Metadata]),
1357+
)
1358+
.await?;
1359+
1360+
let engine = DefaultEngineBuilder::new(storage.clone()).build();
1361+
let engine = engine_to_handle(Arc::new(engine), allocate_err);
1362+
let snap = unsafe {
1363+
ok_or_panic(snapshot(
1364+
kernel_string_slice!(table_root),
1365+
engine.shallow_copy(),
1366+
))
1367+
};
1368+
1369+
let ts = unsafe {
1370+
ok_or_panic(snapshot_timestamp(
1371+
snap.shallow_copy(),
1372+
engine.shallow_copy(),
1373+
))
1374+
};
1375+
// ICT is not enabled -- falls back to commit file mtime (written "now").
1376+
let now_ms = std::time::SystemTime::now()
1377+
.duration_since(std::time::UNIX_EPOCH)
1378+
.unwrap()
1379+
.as_millis() as i64;
1380+
let two_days_ms = 2 * 24 * 60 * 60 * 1000_i64;
1381+
assert!(
1382+
(now_ms - two_days_ms..=now_ms).contains(&ts),
1383+
"timestamp {ts} not within 2 days of now {now_ms}"
1384+
);
1385+
1386+
unsafe { free_snapshot(snap) }
1387+
unsafe { free_engine(engine) }
1388+
Ok(())
1389+
}
1390+
1391+
// TODO: (PR #2307) will introduce a helper function for setting up storage, engine.
1392+
// The test will need to refactor to use the helper function.
1393+
#[tokio::test]
1394+
async fn test_snapshot_timestamp_ict_enabled() -> Result<(), Box<dyn std::error::Error>> {
1395+
let storage = Arc::new(InMemory::new());
1396+
let table_root = "memory:///test_table/";
1397+
1398+
// create_table with "inCommitTimestamp" in writer_features sets up:
1399+
// - protocol v3.7 with writerFeatures=["inCommitTimestamp"]
1400+
// - metadata config: enableInCommitTimestamps=true, enablement version/timestamp
1401+
// - commitInfo with inCommitTimestamp=1612345678 (fixed test value)
1402+
create_table(
1403+
storage.clone(),
1404+
Url::parse(table_root)?,
1405+
Arc::new(StructType::try_new([]).unwrap()),
1406+
&[],
1407+
true,
1408+
vec![],
1409+
vec!["inCommitTimestamp"],
1410+
)
1411+
.await?;
1412+
1413+
let engine = DefaultEngineBuilder::new(storage.clone()).build();
1414+
let engine = engine_to_handle(Arc::new(engine), allocate_err);
1415+
let snap = unsafe {
1416+
ok_or_panic(snapshot(
1417+
kernel_string_slice!(table_root),
1418+
engine.shallow_copy(),
1419+
))
1420+
};
1421+
1422+
let ts = unsafe {
1423+
ok_or_panic(snapshot_timestamp(
1424+
snap.shallow_copy(),
1425+
engine.shallow_copy(),
1426+
))
1427+
};
1428+
assert_eq!(ts, 1612345678_i64);
1429+
1430+
unsafe { free_snapshot(snap) }
1431+
unsafe { free_engine(engine) }
1432+
Ok(())
1433+
}
1434+
13201435
#[rstest]
13211436
#[case(
13221437
METADATA_WITH_TABLE_PROPERTIES,

kernel/src/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1123,7 +1123,7 @@ impl Snapshot {
11231123
/// [`get_in_commit_timestamp`]: Self::get_in_commit_timestamp
11241124
#[allow(unused)]
11251125
#[instrument(parent = &self.span, name = "snap.get_ts", skip_all, err)]
1126-
pub(crate) fn get_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
1126+
pub fn get_timestamp(&self, engine: &dyn Engine) -> DeltaResult<i64> {
11271127
match self
11281128
.table_configuration()
11291129
.in_commit_timestamp_enablement()?

0 commit comments

Comments
 (0)