From fd12f12620d3cec80c67373160a0b933fdaf7ba5 Mon Sep 17 00:00:00 2001 From: Vignesh Narayanaswamy Date: Mon, 1 Jun 2026 13:08:53 -0700 Subject: [PATCH] fix(snowflake): dedup model buffer by hash before flush A single Ledger.add() pass buffers the same new model twice (register() saves it, then update_model() saves it again). The flush MERGE is idempotent only once the target row exists; for a brand-new model the empty-target INSERT fires per source row, so an undeduped buffer writes duplicate rows. Dedup the buffer by model_hash (last write wins) in _flush_models before the MERGE. Surfaced by a first-time bulk insert of ~500 new nodes, which produced exactly 2x rows. Existing nodes were unaffected (register() early-returns from cache, so only update_model() buffers them, and MERGE updates in place). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/model_ledger/backends/snowflake.py | 9 +++++ tests/test_backends/test_snowflake_ledger.py | 37 ++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/model_ledger/backends/snowflake.py b/src/model_ledger/backends/snowflake.py index a823df5..35535df 100644 --- a/src/model_ledger/backends/snowflake.py +++ b/src/model_ledger/backends/snowflake.py @@ -136,6 +136,15 @@ def flush(self) -> None: def _flush_models(self) -> None: if not self._model_buffer: return + # Dedup by model_hash (last write wins). A single Ledger.add() pass can + # buffer the same new model twice — register() saves it, then + # update_model() saves it again. The MERGE is idempotent only once the + # target row exists; for a brand-new model the empty-target INSERT fires + # per source row, so an undeduped buffer produces duplicate rows. + deduped: dict[str, ModelRef] = {} + for model in self._model_buffer: + deduped[model.model_hash] = model + self._model_buffer = list(deduped.values()) if self._flush_models_pandas(): self._model_buffer.clear() return diff --git a/tests/test_backends/test_snowflake_ledger.py b/tests/test_backends/test_snowflake_ledger.py index 5c6d16c..6ffaef6 100644 --- a/tests/test_backends/test_snowflake_ledger.py +++ b/tests/test_backends/test_snowflake_ledger.py @@ -285,3 +285,40 @@ def fake_exec(session, sql: str, *args, **kwargs): with pytest.raises(RuntimeError, match="insufficient privileges"): SnowflakeLedgerBackend(connection=object(), schema="TEST.LEDGER") + + +def test_flush_dedups_model_buffer_by_hash(): + """register() and update_model() both buffer the same new model in one + Ledger.add() pass. Without dedup, the MERGE inserts both copies because the + target row doesn't exist yet (empty-target INSERT fires per source row), + producing duplicate rows. _flush_models must collapse the buffer by + model_hash so each model reaches the MERGE exactly once. + """ + from model_ledger.backends.snowflake import SnowflakeLedgerBackend + + seen_hashes: list[str] = [] + + class RecordingSession: + """Captures every model_hash that appears in a MODELS MERGE source.""" + + def sql(self, query: str, params: Any = None) -> MockCollectResult: + if "MERGE INTO" in query.upper() and ".MODELS " in query.upper(): + for m in re.finditer(r"'([^']+)'\s+AS\s+model_hash", query): + seen_hashes.append(m.group(1)) + return MockCollectResult([]) + + backend = SnowflakeLedgerBackend(schema="TEST_SCHEMA", connection=RecordingSession()) + model = ModelRef( + name="fraud_scorer", + owner="risk-team", + model_type="scoring_model", + tier="unclassified", + purpose="", + ) + backend.save_model(model) # register() path + backend.save_model(model) # update_model() path (same hash) + backend.flush() + + assert seen_hashes.count(model.model_hash) == 1, ( + f"model written {seen_hashes.count(model.model_hash)}x to MERGE source, expected 1" + )