From 107d4b56b95857512c55e1560415d244212d7c5e Mon Sep 17 00:00:00 2001 From: "m.migdisoglu" Date: Thu, 18 Jun 2026 00:35:55 +0200 Subject: [PATCH 1/4] add support for databricks contraints in non-incremental ingestion --- .../source/database/unitycatalog/metadata.py | 41 ++++++++++++++++++- .../source/database/unitycatalog/queries.py | 8 ++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 08452aa936d0..639d512431a2 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -94,7 +94,7 @@ UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, UNITY_CATALOG_GET_ALL_TABLE_TAGS, UNITY_CATALOG_GET_CATALOGS_TAGS, - UNITY_CATALOG_GET_TABLE_DDL, + UNITY_CATALOG_GET_TABLE_DDL, UNITY_CATALOG_TABLE_CONSTRAINTS, ) from metadata.utils import fqn from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table @@ -260,6 +260,7 @@ def _set_incremental_table_processor(self, catalog: str) -> None: with self._state_lock: self.incremental_table_processor = incremental_table_processor + def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]: """ From topology. @@ -339,6 +340,40 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab yield Either(right=schema_request) self.register_record_schema_request(schema_request=schema_request) + def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: + """ + Build and execute SQL query to fetch table constraints. + Handles cases where catalog_name and/or schema_name may be None. + """ + schema_name = self.context.get().database_schema + catalog_name = self.context.get().database + + sql = UNITY_CATALOG_TABLE_CONSTRAINTS + params = {} + # Build WHERE clause with proper handling of None values + + if catalog_name is not None: + sql += " AND table_catalog = :catalog_name" + params["catalog_name"] = catalog_name + if schema_name is not None: + sql += " AND table_schema = :schema_name" + params["schema_name"] = schema_name + + tables_with_constraints = set() + try: + cursor = self.sql_connection.execute(text(sql), params) + # Collect unique tables with constraints + for row in cursor: + table_identifier = (row.table_catalog, row.table_schema, row.table_name) + tables_with_constraints.add(table_identifier) + logger.debug(f"Table with constraints: {table_identifier}") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error fetching table constraints for catalog [{catalog_name}], schema [{schema_name}]: {exc}" + ) + return tables_with_constraints + def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 """ Handle table and views. @@ -356,10 +391,14 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: if self.incremental.enabled and self.incremental_table_processor: yield from self._get_incremental_tables(catalog_name, schema_name) else: + table_with_constraints = self._get_tables_with_constraints() for table in self.client.tables.list( catalog_name=catalog_name, schema_name=schema_name, ): + if (table.catalog_name, table.schema_name, table.name) in table_with_constraints: + # Only tables with constraints require full fetch; list() doesn't include constraint details + table = self.client.tables.get(table.full_name) yield from self._process_table(table, catalog_name, schema_name) def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 27f48e0dbcf6..f24071dd4cc6 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -136,3 +136,11 @@ WHERE 1=0 """ ) + +UNITY_CATALOG_TABLE_CONSTRAINTS = textwrap.dedent( + """ + SELECT table_catalog, table_schema, table_name, constraint_name, constraint_type + FROM system.information_schema.table_constraints + WHERE 1=1 + """ +) \ No newline at end of file From 4e9f3802ec26eb8bcbf19e3704f8f6bd6a5b7bae Mon Sep 17 00:00:00 2001 From: "m.migdisoglu" Date: Thu, 18 Jun 2026 01:03:45 +0200 Subject: [PATCH 2/4] fix exception handling, code cleanup --- .../source/database/unitycatalog/metadata.py | 13 ++++++++++--- .../source/database/unitycatalog/queries.py | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 639d512431a2..0441002a7d91 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -350,7 +350,6 @@ def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: sql = UNITY_CATALOG_TABLE_CONSTRAINTS params = {} - # Build WHERE clause with proper handling of None values if catalog_name is not None: sql += " AND table_catalog = :catalog_name" @@ -362,7 +361,6 @@ def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: tables_with_constraints = set() try: cursor = self.sql_connection.execute(text(sql), params) - # Collect unique tables with constraints for row in cursor: table_identifier = (row.table_catalog, row.table_schema, row.table_name) tables_with_constraints.add(table_identifier) @@ -398,7 +396,16 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: ): if (table.catalog_name, table.schema_name, table.name) in table_with_constraints: # Only tables with constraints require full fetch; list() doesn't include constraint details - table = self.client.tables.get(table.full_name) + try: + table = self.client.tables.get(table.full_name) + except Exception as exc: + self.status.failed( + StackTraceError( + name=table.full_name, + error=f"Unexpected exception fetching the contraints on [{table.full_name}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) yield from self._process_table(table, catalog_name, schema_name) def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index f24071dd4cc6..73af2284957d 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -139,8 +139,8 @@ UNITY_CATALOG_TABLE_CONSTRAINTS = textwrap.dedent( """ - SELECT table_catalog, table_schema, table_name, constraint_name, constraint_type + SELECT DISTINCT table_catalog, table_schema, table_name FROM system.information_schema.table_constraints WHERE 1=1 """ -) \ No newline at end of file +) From 2e62c1c41423b306a52e1c11808a045d51090f81 Mon Sep 17 00:00:00 2001 From: "m.migdisoglu" Date: Thu, 18 Jun 2026 01:30:16 +0200 Subject: [PATCH 3/4] failure to fetch the contraints is not an error but a warning --- .../source/database/unitycatalog/metadata.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 0441002a7d91..f479c73402b5 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -399,13 +399,13 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: try: table = self.client.tables.get(table.full_name) except Exception as exc: - self.status.failed( - StackTraceError( - name=table.full_name, - error=f"Unexpected exception fetching the contraints on [{table.full_name}]: {exc}", - stackTrace=traceback.format_exc(), - ) + msg = ( + f"Unexpected exception in fetching constraints " + f"(table [{table.full_name}]: {exc}. " + f"Contraints will be ignored." ) + logger.warning(msg) + self.status.warning(table.name, msg) yield from self._process_table(table, catalog_name, schema_name) def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 From 0d5a5f0a77fbc9f13246525caaf2bd28bb0cd1ee Mon Sep 17 00:00:00 2001 From: "m.migdisoglu" Date: Thu, 18 Jun 2026 01:50:48 +0200 Subject: [PATCH 4/4] short circuit contraints collection if no schema or catalog is present --- .../ingestion/source/database/unitycatalog/metadata.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index f479c73402b5..35cd7cb96d3b 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -347,6 +347,9 @@ def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: """ schema_name = self.context.get().database_schema catalog_name = self.context.get().database + tables_with_constraints = set() + if catalog_name is None or schema_name is None: + return tables_with_constraints sql = UNITY_CATALOG_TABLE_CONSTRAINTS params = {} @@ -358,7 +361,6 @@ def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: sql += " AND table_schema = :schema_name" params["schema_name"] = schema_name - tables_with_constraints = set() try: cursor = self.sql_connection.execute(text(sql), params) for row in cursor: @@ -401,8 +403,8 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: except Exception as exc: msg = ( f"Unexpected exception in fetching constraints " - f"(table [{table.full_name}]: {exc}. " - f"Contraints will be ignored." + f"Constraints will be ignored." + f"table [{table.full_name}]: {exc}. " ) logger.warning(msg) self.status.warning(table.name, msg)