From 6a1445ad64857e206835986eaa3e154e1288e64d Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 5 Mar 2025 13:31:20 -0600 Subject: [PATCH 1/5] fix: Pass cache refresh error handling function to CachingRegistry --- sdk/python/feast/infra/registry/caching_registry.py | 10 +++++++--- sdk/python/feast/infra/registry/sql.py | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index bbb6a71cd0e..9403bebb65a 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -6,7 +6,7 @@ from abc import abstractmethod from datetime import timedelta from threading import Lock -from typing import List, Optional +from typing import Callable, List, Optional from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource @@ -24,13 +24,15 @@ from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.utils import _utc_now +from sdk.python.feast.expediagroup.caching_registry.functions import start_thread_safely logger = logging.getLogger(__name__) class CachingRegistry(BaseRegistry): - def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str): + def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str, on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None): self.cache_mode = cache_mode + self._on_cache_refresh_failure = on_cache_refresh_failure self.cached_registry_proto = RegistryProto() self._refresh_lock = Lock() self.cached_registry_proto_ttl = timedelta( @@ -471,7 +473,8 @@ def refresh_loop(): if not self._stop_event.is_set(): self.refresh() except Exception as e: - logger.exception("Exception in refresh_loop: %s", e) + if self._on_cache_refresh_failure: + self._on_cache_refresh_failure(e) try: self.registry_refresh_thread = threading.Thread( @@ -483,6 +486,7 @@ def refresh_loop(): ) except Exception as e: logger.exception("Failed to start registry refresh thread: %s", e) + raise e def _exit_handler(self): logger.info("Exiting, setting stop event for registry cache refresh thread") diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 3954bcaea46..0b1f6ff97ad 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -264,6 +264,7 @@ def __init__( registry_config, project: str, repo_path: Optional[Path], + on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None, ): assert registry_config is not None and isinstance( registry_config, SqlRegistryConfig @@ -296,6 +297,7 @@ def __init__( project=project, cache_ttl_seconds=registry_config.cache_ttl_seconds, cache_mode=registry_config.cache_mode, + on_cache_refresh_failure=on_cache_refresh_failure, ) def _sync_feast_metadata_to_projects_table(self): From 38fb85e634790b3c926396331f2da7389156c92d Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 5 Mar 2025 13:43:18 -0600 Subject: [PATCH 2/5] fix: run linter --- sdk/python/feast/infra/registry/caching_registry.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 9403bebb65a..dd6937e3037 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -24,13 +24,18 @@ from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.utils import _utc_now -from sdk.python.feast.expediagroup.caching_registry.functions import start_thread_safely logger = logging.getLogger(__name__) class CachingRegistry(BaseRegistry): - def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str, on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None): + def __init__( + self, + project: str, + cache_ttl_seconds: int, + cache_mode: str, + on_cache_refresh_failure: Optional[Callable[[Exception], None]] = None, + ): self.cache_mode = cache_mode self._on_cache_refresh_failure = on_cache_refresh_failure self.cached_registry_proto = RegistryProto() From 66faeaed800dd050d468a9ee98e21c1c008f63e6 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 5 Mar 2025 14:32:57 -0600 Subject: [PATCH 3/5] throw error in refresh loop 10% of time for testing --- sdk/python/feast/infra/registry/caching_registry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index dd6937e3037..fe2878d2109 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,5 +1,6 @@ import atexit import logging +import math import threading import time import warnings @@ -474,6 +475,8 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): def refresh_loop(): while not self._stop_event.is_set(): try: + if (math.random() * 100) < 10: + raise Exception("Random exception to test error handling") time.sleep(cache_ttl_seconds) if not self._stop_event.is_set(): self.refresh() From 6a11cfcef91a7a21d0b0b086ecb64677248af9ad Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 5 Mar 2025 15:00:30 -0600 Subject: [PATCH 4/5] fix: random --- sdk/python/feast/infra/registry/caching_registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index fe2878d2109..0c675f27384 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,6 +1,6 @@ import atexit import logging -import math +import random import threading import time import warnings @@ -475,7 +475,7 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): def refresh_loop(): while not self._stop_event.is_set(): try: - if (math.random() * 100) < 10: + if random.randint(1, 100) < 10: raise Exception("Random exception to test error handling") time.sleep(cache_ttl_seconds) if not self._stop_event.is_set(): From 66314f739cbe0a5e5a41b69ab79565c50b346e5f Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 5 Mar 2025 15:40:26 -0600 Subject: [PATCH 5/5] remove random failures --- sdk/python/feast/infra/registry/caching_registry.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 0c675f27384..dd6937e3037 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,6 +1,5 @@ import atexit import logging -import random import threading import time import warnings @@ -475,8 +474,6 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): def refresh_loop(): while not self._stop_event.is_set(): try: - if random.randint(1, 100) < 10: - raise Exception("Random exception to test error handling") time.sleep(cache_ttl_seconds) if not self._stop_event.is_set(): self.refresh()