diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 08452aa936d0..35cd7cb96d3b 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -94,7 +94,7 @@ UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, UNITY_CATALOG_GET_ALL_TABLE_TAGS, UNITY_CATALOG_GET_CATALOGS_TAGS, - UNITY_CATALOG_GET_TABLE_DDL, + UNITY_CATALOG_GET_TABLE_DDL, UNITY_CATALOG_TABLE_CONSTRAINTS, ) from metadata.utils import fqn from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table @@ -260,6 +260,7 @@ def _set_incremental_table_processor(self, catalog: str) -> None: with self._state_lock: self.incremental_table_processor = incremental_table_processor + def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]: """ From topology. @@ -339,6 +340,40 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab yield Either(right=schema_request) self.register_record_schema_request(schema_request=schema_request) + def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: + """ + Build and execute SQL query to fetch table constraints. + Handles cases where catalog_name and/or schema_name may be None. + """ + schema_name = self.context.get().database_schema + catalog_name = self.context.get().database + tables_with_constraints = set() + if catalog_name is None or schema_name is None: + return tables_with_constraints + + sql = UNITY_CATALOG_TABLE_CONSTRAINTS + params = {} + + if catalog_name is not None: + sql += " AND table_catalog = :catalog_name" + params["catalog_name"] = catalog_name + if schema_name is not None: + sql += " AND table_schema = :schema_name" + params["schema_name"] = schema_name + + try: + cursor = self.sql_connection.execute(text(sql), params) + for row in cursor: + table_identifier = (row.table_catalog, row.table_schema, row.table_name) + tables_with_constraints.add(table_identifier) + logger.debug(f"Table with constraints: {table_identifier}") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error fetching table constraints for catalog [{catalog_name}], schema [{schema_name}]: {exc}" + ) + return tables_with_constraints + def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 """ Handle table and views. @@ -356,10 +391,23 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: if self.incremental.enabled and self.incremental_table_processor: yield from self._get_incremental_tables(catalog_name, schema_name) else: + table_with_constraints = self._get_tables_with_constraints() for table in self.client.tables.list( catalog_name=catalog_name, schema_name=schema_name, ): + if (table.catalog_name, table.schema_name, table.name) in table_with_constraints: + # Only tables with constraints require full fetch; list() doesn't include constraint details + try: + table = self.client.tables.get(table.full_name) + except Exception as exc: + msg = ( + f"Unexpected exception in fetching constraints " + f"Constraints will be ignored." + f"table [{table.full_name}]: {exc}. " + ) + logger.warning(msg) + self.status.warning(table.name, msg) yield from self._process_table(table, catalog_name, schema_name) def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 27f48e0dbcf6..73af2284957d 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -136,3 +136,11 @@ WHERE 1=0 """ ) + +UNITY_CATALOG_TABLE_CONSTRAINTS = textwrap.dedent( + """ + SELECT DISTINCT table_catalog, table_schema, table_name + FROM system.information_schema.table_constraints + WHERE 1=1 + """ +)