-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Fixes 29147: add support for databricks contraints in non-incremental ingestion #29148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
107d4b5
4e9f380
2e62c1c
0d5a5f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,7 +94,7 @@ | |
| UNITY_CATALOG_GET_ALL_TABLE_COLUMNS_TAGS, | ||
| UNITY_CATALOG_GET_ALL_TABLE_TAGS, | ||
| UNITY_CATALOG_GET_CATALOGS_TAGS, | ||
| UNITY_CATALOG_GET_TABLE_DDL, | ||
| UNITY_CATALOG_GET_TABLE_DDL, UNITY_CATALOG_TABLE_CONSTRAINTS, | ||
| ) | ||
| from metadata.utils import fqn | ||
| from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table | ||
|
|
@@ -260,6 +260,7 @@ def _set_incremental_table_processor(self, catalog: str) -> None: | |
| with self._state_lock: | ||
| self.incremental_table_processor = incremental_table_processor | ||
|
|
||
|
|
||
| def yield_database(self, database_name: str) -> Iterable[Either[CreateDatabaseRequest]]: | ||
| """ | ||
| From topology. | ||
|
|
@@ -339,6 +340,40 @@ def yield_database_schema(self, schema_name: str) -> Iterable[Either[CreateDatab | |
| yield Either(right=schema_request) | ||
| self.register_record_schema_request(schema_request=schema_request) | ||
|
|
||
| def _get_tables_with_constraints(self) -> set[tuple[str, str, str]]: | ||
| """ | ||
| Build and execute SQL query to fetch table constraints. | ||
| Handles cases where catalog_name and/or schema_name may be None. | ||
| """ | ||
| schema_name = self.context.get().database_schema | ||
| catalog_name = self.context.get().database | ||
| tables_with_constraints = set() | ||
| if catalog_name is None or schema_name is None: | ||
| return tables_with_constraints | ||
|
|
||
| sql = UNITY_CATALOG_TABLE_CONSTRAINTS | ||
| params = {} | ||
|
|
||
| if catalog_name is not None: | ||
| sql += " AND table_catalog = :catalog_name" | ||
| params["catalog_name"] = catalog_name | ||
| if schema_name is not None: | ||
| sql += " AND table_schema = :schema_name" | ||
| params["schema_name"] = schema_name | ||
|
|
||
| try: | ||
| cursor = self.sql_connection.execute(text(sql), params) | ||
|
gitar-bot[bot] marked this conversation as resolved.
|
||
| for row in cursor: | ||
| table_identifier = (row.table_catalog, row.table_schema, row.table_name) | ||
| tables_with_constraints.add(table_identifier) | ||
| logger.debug(f"Table with constraints: {table_identifier}") | ||
| except Exception as exc: | ||
| logger.debug(traceback.format_exc()) | ||
| logger.warning( | ||
| f"Error fetching table constraints for catalog [{catalog_name}], schema [{schema_name}]: {exc}" | ||
| ) | ||
| return tables_with_constraints | ||
|
|
||
| def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 | ||
| """ | ||
| Handle table and views. | ||
|
|
@@ -356,10 +391,23 @@ def get_tables_name_and_type(self) -> Iterable[Tuple[str, TableType]]: # noqa: | |
| if self.incremental.enabled and self.incremental_table_processor: | ||
| yield from self._get_incremental_tables(catalog_name, schema_name) | ||
| else: | ||
| table_with_constraints = self._get_tables_with_constraints() | ||
| for table in self.client.tables.list( | ||
| catalog_name=catalog_name, | ||
| schema_name=schema_name, | ||
| ): | ||
| if (table.catalog_name, table.schema_name, table.name) in table_with_constraints: | ||
| # Only tables with constraints require full fetch; list() doesn't include constraint details | ||
| try: | ||
| table = self.client.tables.get(table.full_name) | ||
| except Exception as exc: | ||
| msg = ( | ||
| f"Unexpected exception in fetching constraints " | ||
| f"Constraints will be ignored." | ||
| f"table [{table.full_name}]: {exc}. " | ||
| ) | ||
|
Comment on lines
+404
to
+408
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Quality: Malformed warning message: unbalanced bracket and typoThe warning string built when Fix the unbalanced brackets and the 'Contraints' typo.:
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is based on an outdated version of the code. The typo and the paranthesis mentioned in the comment doesn't exist. |
||
| logger.warning(msg) | ||
| self.status.warning(table.name, msg) | ||
| yield from self._process_table(table, catalog_name, schema_name) | ||
|
gitar-bot[bot] marked this conversation as resolved.
greptile-apps[bot] marked this conversation as resolved.
|
||
|
|
||
| def _get_incremental_tables(self, catalog_name: str, schema_name: str) -> Iterable[Tuple[str, TableType]]: # noqa: UP006 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Quality: No tests added for new constraint-fetch logic
The PR adds a new code path (
_get_tables_with_constraintsand the conditional full-fetch inget_tables_name_and_type) but includes no unit tests, and the PR checklist for tests is unchecked. Worth adding a test that mocksclient.tables.list/getand the constraints query to verify only tables present in the constraints set trigger the extraget()call, and that constraints are propagated into the resulting table request.Was this helpful? React with 👍 / 👎