diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index bbb6a71cd0e..dd6937e3037 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -6,7 +6,7 @@ from abc import abstractmethod from datetime import timedelta from threading import Lock -from typing import List, Optional +from typing import Callable, List, Optional from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource @@ -29,8 +29,15 @@ class CachingRegistry(BaseRegistry): - def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str): + def __init__( + self, + project: str, + cache_ttl_seconds: int, + cache_mode: str, + on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None, + ): self.cache_mode = cache_mode + self._on_cache_refresh_failure = on_cache_refresh_failure self.cached_registry_proto = RegistryProto() self._refresh_lock = Lock() self.cached_registry_proto_ttl = timedelta( @@ -471,7 +478,8 @@ def refresh_loop(): if not self._stop_event.is_set(): self.refresh() except Exception as e: - logger.exception("Exception in refresh_loop: %s", e) + if self._on_cache_refresh_failure: + self._on_cache_refresh_failure(e) try: self.registry_refresh_thread = threading.Thread( @@ -483,6 +491,7 @@ def refresh_loop(): ) except Exception as e: logger.exception("Failed to start registry refresh thread: %s", e) + raise e def _exit_handler(self): logger.info("Exiting, setting stop event for registry cache refresh thread") diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 3954bcaea46..0b1f6ff97ad 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -264,6 +264,7 @@ def __init__( registry_config, project: str, repo_path: Optional[Path], + on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None, ): assert registry_config is not None and isinstance( registry_config, SqlRegistryConfig @@ -296,6 +297,7 @@ def __init__( project=project, cache_ttl_seconds=registry_config.cache_ttl_seconds, cache_mode=registry_config.cache_mode, + on_cache_refresh_failure=on_cache_refresh_failure, ) def _sync_feast_metadata_to_projects_table(self):