From eafc7e6e16272eaeb50a218bb7ab4e19c88ab922 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 26 Aug 2024 13:33:02 -0500 Subject: [PATCH 1/5] feat: Search Projects RPC --- protos/feast/registry/RegistryServer.proto | 14 +++ sdk/python/feast/infra/registry/sql.py | 105 ++++++++++++++++++++- 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 44529f5409c..7c018f2b483 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -68,6 +68,20 @@ service RegistryServer{ rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {} rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {} + // Search RPCs + rpc SearchProjects (SearchProjectsRequest) returns (SearchProjectsResponse) {} +} + +message SearchProjectsRequest { + string search_text = 1; + google.protobuf.Timestamp updated_at = 2; + int32 page_size = 3; + int32 page_index = 4; +} + +message SearchProjectsResponse { + repeated feast.core.ProjectMetadata projects = 1; + int32 total_page_indices = 2; } message RefreshRequest { diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index eef3197c798..56b1ef4b488 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from pydantic import StrictStr from sqlalchemy import ( # type: ignore @@ -21,6 +21,8 @@ insert, select, update, + and_, + func ) from sqlalchemy.engine import Engine @@ -1001,14 +1003,14 @@ def get_all_projects(self) -> List[ProjectMetadata]: project_name=project_id ) - project_metadata_model: ProjectMetadata = project_metadata_dict[ + project_metadata: ProjectMetadata = project_metadata_dict[ project_id ] if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: - project_metadata_model.project_uuid = metadata_value + project_metadata.project_uuid = metadata_value if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: - project_metadata_model.last_updated_timestamp = ( + project_metadata.last_updated_timestamp = ( datetime.fromtimestamp(int(metadata_value), tz=timezone.utc) ) return list(project_metadata_dict.values()) @@ -1043,3 +1045,98 @@ def _get_project_metadata( return project_metadata else: return None + + def search_projects( + self, + search_text="", + updated_at: datetime = None, + page_size=10, + page_index=0 + ) -> Tuple[List[ProjectMetadata], int]: + """ + Search for projects based on the provided search parameters with pagination, + using SQL queries to handle filtering efficiently, including a LIKE statement for matching search_text. + Returns a tuple of the list of ProjectMetadata objects, and the total number of pages. + + :param search_text: Filter by project name using a LIKE statement. + :param updated_at: Filter projects updated after this timestamp (datetime). + :param page_size: The number of results to return per page. + :param page_index: The index for fetching the next page (used as an offset). + :return: A tuple containing the list of ProjectMetadata objects, and the total number of pages. + """ + project_metadata_dict: Dict[str, ProjectMetadata] = {} + + with self.engine.begin() as conn: + # Base SQL query to count total number of matching projects + count_stmt = select(func.count().label( + 'total')).select_from(feast_metadata) + + if search_text: + count_stmt = count_stmt.where( + feast_metadata.c.project_id.like(f"%{search_text}%")) + + if updated_at is not None: + updated_at_timestamp = int(updated_at.timestamp()) + count_stmt = count_stmt.where( + and_( + feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str( + updated_at_timestamp) + ) + ) + + # Execute the count query to get the total number of matching projects + total_projects = conn.execute(count_stmt).scalar() + + # gRPC defaults empty page_size to 0, which overrides the default value of 10. Have to explicitly set it to 10 here. + page_size = page_size if page_size > 0 else 10 + # Calculate total pages + total_page_indices = ( + total_projects + page_size - 1) // page_size + + # Base SQL query for retrieving projects + stmt = select(feast_metadata) + + if search_text: + stmt = stmt.where( + feast_metadata.c.project_id.like(f"%{search_text}%")) + + if updated_at is not None: + stmt = stmt.where( + and_( + feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str( + updated_at_timestamp) + ) + ) + + # Apply ordering and pagination + stmt = stmt.order_by(feast_metadata.c.project_id).limit( + page_size).offset(page_index * page_size) + + rows = conn.execute(stmt).all() + + if rows: + for row in rows: + project_id = row._mapping["project_id"] + metadata_key = row._mapping["metadata_key"] + metadata_value = row._mapping["metadata_value"] + + if project_id not in project_metadata_dict: + project_metadata_dict[project_id] = ProjectMetadata( + project_name=project_id + ) + + project_metadata: ProjectMetadata = project_metadata_dict[ + project_id] + + if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: + project_metadata.project_uuid = metadata_value + + if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: + project_metadata.last_updated_timestamp = datetime.utcfromtimestamp( + int(metadata_value)) + + project_list = list(project_metadata_dict.values()) + + return project_list, total_page_indices From b91245fee1d7c11835e3ff6b728e4acf38651c87 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 26 Aug 2024 13:39:32 -0500 Subject: [PATCH 2/5] fix: lint failure --- sdk/python/feast/infra/registry/sql.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 56b1ef4b488..21d94979c54 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1048,10 +1048,10 @@ def _get_project_metadata( def search_projects( self, - search_text="", - updated_at: datetime = None, - page_size=10, - page_index=0 + search_text: str = "", + updated_at: Optional[datetime] = None, + page_size: int = 10, + page_index: int = 0 ) -> Tuple[List[ProjectMetadata], int]: """ Search for projects based on the provided search parameters with pagination, From f7f7a39331addca17ce6f7161cae5e6ba59afdba Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 26 Aug 2024 13:43:21 -0500 Subject: [PATCH 3/5] fix: lint failure again --- sdk/python/feast/infra/registry/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 21d94979c54..f8ca2d05df5 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1086,7 +1086,7 @@ def search_projects( ) # Execute the count query to get the total number of matching projects - total_projects = conn.execute(count_stmt).scalar() + total_projects = conn.execute(count_stmt).scalar() or 0 # gRPC defaults empty page_size to 0, which overrides the default value of 10. Have to explicitly set it to 10 here. page_size = page_size if page_size > 0 else 10 From 0a312cc39ba636b6d146eeb21f2629799fd0285f Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 26 Aug 2024 13:49:35 -0500 Subject: [PATCH 4/5] fix: lint failure --- sdk/python/feast/infra/registry/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index f8ca2d05df5..2ed8e2b94f8 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -16,13 +16,13 @@ MetaData, String, Table, + and_, create_engine, delete, + func, insert, select, update, - and_, - func ) from sqlalchemy.engine import Engine From 07cf95809dad2a089a10c285740018c743c3bc45 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 26 Aug 2024 13:54:10 -0500 Subject: [PATCH 5/5] fix: lint failure --- sdk/python/feast/infra/registry/sql.py | 41 ++++++++++++++------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 2ed8e2b94f8..8ac040a7a1b 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1051,7 +1051,7 @@ def search_projects( search_text: str = "", updated_at: Optional[datetime] = None, page_size: int = 10, - page_index: int = 0 + page_index: int = 0, ) -> Tuple[List[ProjectMetadata], int]: """ Search for projects based on the provided search parameters with pagination, @@ -1068,20 +1068,20 @@ def search_projects( with self.engine.begin() as conn: # Base SQL query to count total number of matching projects - count_stmt = select(func.count().label( - 'total')).select_from(feast_metadata) + count_stmt = select(func.count().label("total")).select_from(feast_metadata) if search_text: count_stmt = count_stmt.where( - feast_metadata.c.project_id.like(f"%{search_text}%")) + feast_metadata.c.project_id.like(f"%{search_text}%") + ) if updated_at is not None: updated_at_timestamp = int(updated_at.timestamp()) count_stmt = count_stmt.where( and_( - feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, - feast_metadata.c.metadata_value >= str( - updated_at_timestamp) + feast_metadata.c.metadata_key + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str(updated_at_timestamp), ) ) @@ -1091,28 +1091,29 @@ def search_projects( # gRPC defaults empty page_size to 0, which overrides the default value of 10. Have to explicitly set it to 10 here. page_size = page_size if page_size > 0 else 10 # Calculate total pages - total_page_indices = ( - total_projects + page_size - 1) // page_size + total_page_indices = (total_projects + page_size - 1) // page_size # Base SQL query for retrieving projects stmt = select(feast_metadata) if search_text: - stmt = stmt.where( - feast_metadata.c.project_id.like(f"%{search_text}%")) + stmt = stmt.where(feast_metadata.c.project_id.like(f"%{search_text}%")) if updated_at is not None: stmt = stmt.where( and_( - feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, - feast_metadata.c.metadata_value >= str( - updated_at_timestamp) + feast_metadata.c.metadata_key + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str(updated_at_timestamp), ) ) # Apply ordering and pagination - stmt = stmt.order_by(feast_metadata.c.project_id).limit( - page_size).offset(page_index * page_size) + stmt = ( + stmt.order_by(feast_metadata.c.project_id) + .limit(page_size) + .offset(page_index * page_size) + ) rows = conn.execute(stmt).all() @@ -1128,14 +1129,16 @@ def search_projects( ) project_metadata: ProjectMetadata = project_metadata_dict[ - project_id] + project_id + ] if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: project_metadata.project_uuid = metadata_value if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: - project_metadata.last_updated_timestamp = datetime.utcfromtimestamp( - int(metadata_value)) + project_metadata.last_updated_timestamp = ( + datetime.utcfromtimestamp(int(metadata_value)) + ) project_list = list(project_metadata_dict.values())