From 9cd2d59e386ad17b8de8a1f5ed2db1b5c3cb67d1 Mon Sep 17 00:00:00 2001 From: vineet-belur Date: Tue, 4 Nov 2025 13:17:13 -0800 Subject: [PATCH 1/4] feat: Adding first draft registry grpc/remote changes --- go/internal/feast/errors.go | 12 ++ go/internal/feast/registry/grpc.go | 202 ++++++++++++++++++++ go/internal/feast/registry/http.go | 11 +- go/internal/feast/registry/registry.go | 5 +- go/internal/feast/registry/registrystore.go | 10 + 5 files changed, 229 insertions(+), 11 deletions(-) create mode 100644 go/internal/feast/registry/grpc.go diff --git a/go/internal/feast/errors.go b/go/internal/feast/errors.go index f42b4aad82d..f00eb632573 100644 --- a/go/internal/feast/errors.go +++ b/go/internal/feast/errors.go @@ -1,6 +1,8 @@ package feast import ( + "fmt" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -20,3 +22,13 @@ func (FeastTransformationServiceNotConfigured) GRPCStatus() *status.Status { func (e FeastTransformationServiceNotConfigured) Error() string { return e.GRPCStatus().Err().Error() } + +// NotImplementedError represents an error for a function that is not yet implemented. +type NotImplementedError struct { + FunctionName string +} + +// Error implements the error interface for NotImplementedError. +func (e *NotImplementedError) Error() string { + return fmt.Sprintf("Function '%s' not implemented", e.FunctionName) +} diff --git a/go/internal/feast/registry/grpc.go b/go/internal/feast/registry/grpc.go new file mode 100644 index 00000000000..860aae2196a --- /dev/null +++ b/go/internal/feast/registry/grpc.go @@ -0,0 +1,202 @@ +package registry + +import ( + "context" + "fmt" + "time" + + "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/feast-dev/feast/go/protos/feast/registry" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type GrpcRegistryStore struct { + project string + endpoint string + clientId string + conn *grpc.ClientConn + client registry.RegistryServerClient + timeout time.Duration +} + +func NewGrpcRegistryStore(config *RegistryConfig, project string) (*GrpcRegistryStore, error) { + log.Info().Msgf("Using gRPC Registry: %s", config.Path) + + conn, err := grpc.NewClient( + config.Path, + grpc.WithBlock(), + grpc.WithTimeout(5*time.Second), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to gRPC registry: %w", err) + } + + client := registry.NewRegistryServerClient(conn) + + grs := &GrpcRegistryStore{ + project: project, + endpoint: config.Path, + clientId: config.ClientId, + conn: conn, + client: client, + timeout: 5 * time.Second, + } + + if err := grs.TestConnectivity(); err != nil { + conn.Close() + return nil, err + } + + return grs, nil +} + +func (grs *GrpcRegistryStore) TestConnectivity() error { + // May consider removing this, however added for feature parity with http_server, however no obvious healthcheck exists + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.ListEntitiesRequest{ + Project: grs.project, + AllowCache: false, + } + + _, err := grs.client.ListEntities(ctx, req) + if err != nil { + return fmt.Errorf("gRPC registry connectivity check failed: %w", err) + } + + return nil +} + +func (grs *GrpcRegistryStore) withMetadata(ctx context.Context) context.Context { + if grs.clientId != "" { + md := metadata.Pairs("client-id", grs.clientId) + ctx = metadata.NewOutgoingContext(ctx, md) + } + return ctx +} + +func (grs *GrpcRegistryStore) GetEntity(name string, allowCache bool) (*core.Entity, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetEntityRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + entity, err := grs.client.GetEntity(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get entity %s: %w", name, err) + } + + return entity, nil +} + +func (grs *GrpcRegistryStore) GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + fv, err := grs.client.GetFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get feature view %s: %w", name, err) + } + + return fv, nil +} + +func (grs *GrpcRegistryStore) GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetSortedFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + sfv, err := grs.client.GetSortedFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get sorted feature view %s: %w", name, err) + } + + return sfv, nil +} + +func (grs *GrpcRegistryStore) GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetFeatureServiceRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + fs, err := grs.client.GetFeatureService(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get feature service %s: %w", name, err) + } + + return fs, nil +} + +func (grs *GrpcRegistryStore) GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetOnDemandFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + odfv, err := grs.client.GetOnDemandFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get on-demand feature view %s: %w", name, err) + } + + return odfv, nil +} + +func (grs *GrpcRegistryStore) GetRegistryProto() (*core.Registry, error) { + // gRPC stores use fallback mode, so this returns empty registry + registry := core.Registry{} + return ®istry, nil +} + +func (grs *GrpcRegistryStore) UpdateRegistryProto(rp *core.Registry) error { + return &NotImplementedError{FunctionName: "UpdateRegistryProto"} +} + +func (grs *GrpcRegistryStore) Teardown() error { + if grs.conn != nil { + return grs.conn.Close() + } + return nil +} + +func (grs *GrpcRegistryStore) HasFallback() bool { + return true +} diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index b6474e48dcc..dde0dcb0351 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -8,6 +8,7 @@ import ( "time" "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) @@ -19,16 +20,6 @@ type HttpRegistryStore struct { client http.Client } -// NotImplementedError represents an error for a function that is not yet implemented. -type NotImplementedError struct { - FunctionName string -} - -// Error implements the error interface for NotImplementedError. -func (e *NotImplementedError) Error() string { - return fmt.Sprintf("Function '%s' not implemented", e.FunctionName) -} - func NewHttpRegistryStore(config *RegistryConfig, project string) (*HttpRegistryStore, error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index a5a23e85976..875af4431e4 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -21,6 +21,7 @@ var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{ "file": "FileRegistryStore", "http": "HttpRegistryStore", "https": "HttpRegistryStore", + "grpc": "GrpcRegistryStore", "": "FileRegistryStore", } @@ -182,7 +183,7 @@ func (r *Registry) InitializeRegistry() error { registryProto, err := r.registryStore.GetRegistryProto() if err != nil { switch r.registryStore.(type) { - case *FileRegistryStore, *HttpRegistryStore: + case *FileRegistryStore, *HttpRegistryStore, *GrpcRegistryStore: // TODO: Extend to support remote registry log.Error().Err(err).Msg("Registry Initialization Failed") return err default: @@ -418,6 +419,8 @@ func getRegistryStoreFromType(registryStoreType string, registryConfig *Registry return NewHttpRegistryStore(registryConfig, project) case "S3RegistryStore": return NewS3RegistryStore(registryConfig, repoPath), nil + case "GrpcRegistryStore": + return NewGrpcRegistryStore(registryConfig, project) } return nil, errors.GrpcInternalErrorf("only FileRegistryStore or HttpRegistryStore as a RegistryStore is supported at this moment") } diff --git a/go/internal/feast/registry/registrystore.go b/go/internal/feast/registry/registrystore.go index c9c84508b7c..dc9105e3381 100644 --- a/go/internal/feast/registry/registrystore.go +++ b/go/internal/feast/registry/registrystore.go @@ -11,3 +11,13 @@ type RegistryStore interface { Teardown() error HasFallback() bool } + +type RemoteRegistryStore interface { + RegistryStore // Add base interface for composition. + + GetEntity(name string, allowCache bool) (*core.Entity, error) + GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) + GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) + GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) + GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) +} From 1c564a3cf54b54a1a0bfa59f0c6ac4e52c8160ce Mon Sep 17 00:00:00 2001 From: vineet-belur Date: Tue, 4 Nov 2025 13:21:54 -0800 Subject: [PATCH 2/4] fix: Remove error changes/add correct type annotations for registry --- go/internal/feast/errors.go | 12 --- go/internal/feast/registry/http.go | 10 +++ go/internal/feast/registry/registry.go | 110 ++++++++----------------- 3 files changed, 45 insertions(+), 87 deletions(-) diff --git a/go/internal/feast/errors.go b/go/internal/feast/errors.go index f00eb632573..f42b4aad82d 100644 --- a/go/internal/feast/errors.go +++ b/go/internal/feast/errors.go @@ -1,8 +1,6 @@ package feast import ( - "fmt" - "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,13 +20,3 @@ func (FeastTransformationServiceNotConfigured) GRPCStatus() *status.Status { func (e FeastTransformationServiceNotConfigured) Error() string { return e.GRPCStatus().Err().Error() } - -// NotImplementedError represents an error for a function that is not yet implemented. -type NotImplementedError struct { - FunctionName string -} - -// Error implements the error interface for NotImplementedError. -func (e *NotImplementedError) Error() string { - return fmt.Sprintf("Function '%s' not implemented", e.FunctionName) -} diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index dde0dcb0351..2c71853b19c 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -20,6 +20,16 @@ type HttpRegistryStore struct { client http.Client } +// NotImplementedError represents an error for a function that is not yet implemented. +type NotImplementedError struct { + FunctionName string +} + +// Error implements the error interface for NotImplementedError. +func (e *NotImplementedError) Error() string { + return fmt.Sprintf("Function '%s' not implemented", e.FunctionName) +} + func NewHttpRegistryStore(config *RegistryConfig, project string) (*HttpRegistryStore, error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index 875af4431e4..f4d9e799757 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -265,53 +265,37 @@ func loadModels[U any, T any](protoList []U, cachedModels *cacheMap[T], modelFac } } -func (r *Registry) GetEntity(project string, entityName string) (*model.Entity, error) { - if r.registryStore.HasFallback() { - return r.cachedEntities.getOrLoad(project, entityName, r.GetEntityFromRegistry) - } - - if entity, ok := r.cachedEntities.get(project, entityName); ok { - return entity, nil +func (r *Registry) GetEntityFromRegistry(entityName string, project string) (*model.Entity, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached entity %s found for project %s", entityName, project) -} - -func (r *Registry) GetEntityFromRegistry(entityName string, project string) (*model.Entity, error) { - entityProto, err := r.registryStore.(*HttpRegistryStore).getEntity(entityName, true) + entityProto, err := remoteStore.GetEntity(entityName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no entity %s found in project %s", entityName, project) return nil, errors.GrpcNotFoundErrorf("no entity %s found in project %s", entityName, project) } - - log.Error().Err(err).Msgf("no entity %s found in project %s", entityName, project) + log.Error().Err(err).Msgf("error retrieving entity %s in project %s", entityName, project) return nil, errors.GrpcInternalErrorf("error retrieving entity %s in project %s: %v", entityName, project, err) } return model.NewEntityFromProto(entityProto), nil } -func (r *Registry) GetFeatureView(project string, featureViewName string) (*model.FeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedFeatureViews.getOrLoad(project, featureViewName, r.GetFeatureViewFromRegistry) - } - - if cachedFeatureView, ok := r.cachedFeatureViews.get(project, featureViewName); ok { - return cachedFeatureView, nil +func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project string) (*model.FeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached feature view %s found for project %s", featureViewName, project) -} - -func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project string) (*model.FeatureView, error) { - featureViewProto, err := r.registryStore.(*HttpRegistryStore).getFeatureView(featureViewName, true) + featureViewProto, err := remoteStore.GetFeatureView(featureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature view %s found in project %s", featureViewName, project) return nil, errors.GrpcNotFoundErrorf("no feature view %s found in project %s", featureViewName, project) } - log.Error().Err(err).Msgf("error retrieving feature view %s in project %s", featureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving feature view %s in project %s: %v", featureViewName, project, err) } @@ -319,26 +303,18 @@ func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project st return model.NewFeatureViewFromProto(featureViewProto), nil } -func (r *Registry) GetSortedFeatureView(project string, sortedFeatureViewName string) (*model.SortedFeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedSortedFeatureViews.getOrLoad(project, sortedFeatureViewName, r.GetSortedFeatureViewFromRegistry) - } - - if cachedSortedFeatureView, ok := r.cachedSortedFeatureViews.get(project, sortedFeatureViewName); ok { - return cachedSortedFeatureView, nil +func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached sorted feature view %s found for project %s", sortedFeatureViewName, project) -} - -func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { - sortedFeatureViewProto, err := r.registryStore.(*HttpRegistryStore).getSortedFeatureView(sortedFeatureViewName, true) + sortedFeatureViewProto, err := remoteStore.GetSortedFeatureView(sortedFeatureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no sorted feature view %s found in project %s", sortedFeatureViewName, project) return nil, errors.GrpcNotFoundErrorf("no sorted feature view %s found in project %s", sortedFeatureViewName, project) } - log.Error().Err(err).Msgf("error retrieving sorted feature view %s in project %s", sortedFeatureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving sorted feature view %s in project %s: %v", sortedFeatureViewName, project, err) } @@ -346,26 +322,18 @@ func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string return model.NewSortedFeatureViewFromProto(sortedFeatureViewProto), nil } -func (r *Registry) GetFeatureService(project string, featureServiceName string) (*model.FeatureService, error) { - if r.registryStore.HasFallback() { - return r.cachedFeatureServices.getOrLoad(project, featureServiceName, r.GetFeatureServiceFromRegistry) - } - - if cachedFeatureService, ok := r.cachedFeatureServices.get(project, featureServiceName); ok { - return cachedFeatureService, nil +func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, project string) (*model.FeatureService, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached feature service %s found for project %s", featureServiceName, project) -} - -func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, project string) (*model.FeatureService, error) { - featureServiceProto, err := r.registryStore.(*HttpRegistryStore).getFeatureService(featureServiceName, true) + featureServiceProto, err := remoteStore.GetFeatureService(featureServiceName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature service %s found in project %s", featureServiceName, project) return nil, errors.GrpcNotFoundErrorf("no feature service %s found in project %s", featureServiceName, project) } - log.Error().Err(err).Msgf("error retrieving feature service %s in project %s", featureServiceName, project) return nil, errors.GrpcInternalErrorf("error retrieving feature service %s in project %s: %v", featureServiceName, project, err) } @@ -373,26 +341,18 @@ func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, proj return model.NewFeatureServiceFromProto(featureServiceProto), nil } -func (r *Registry) GetOnDemandFeatureView(project string, onDemandFeatureViewName string) (*model.OnDemandFeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedOnDemandFeatureViews.getOrLoad(project, onDemandFeatureViewName, r.GetOnDemandFeatureViewFromRegistry) - } - - if cachedOnDemandFeatureView, ok := r.cachedOnDemandFeatureViews.get(project, onDemandFeatureViewName); ok { - return cachedOnDemandFeatureView, nil +func (r *Registry) GetOnDemandFeatureViewFromRegistry(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached on demand feature view %s found for project %s", onDemandFeatureViewName, project) -} - -func (r *Registry) GetOnDemandFeatureViewFromRegistry(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { - onDemandFeatureViewProto, err := r.registryStore.(*HttpRegistryStore).getOnDemandFeatureView(onDemandFeatureViewName, true) + onDemandFeatureViewProto, err := remoteStore.GetOnDemandFeatureView(onDemandFeatureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no on demand feature view %s found in project %s", onDemandFeatureViewName, project) return nil, errors.GrpcNotFoundErrorf("no on demand feature view %s found in project %s", onDemandFeatureViewName, project) } - log.Error().Err(err).Msgf("error retrieving on demand feature view %s in project %s", onDemandFeatureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving on demand feature view %s in project %s: %v", onDemandFeatureViewName, project, err) } @@ -413,14 +373,14 @@ func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryCon func getRegistryStoreFromType(registryStoreType string, registryConfig *RegistryConfig, repoPath string, project string) (RegistryStore, error) { switch registryStoreType { - case "FileRegistryStore": - return NewFileRegistryStore(registryConfig, repoPath), nil case "HttpRegistryStore": return NewHttpRegistryStore(registryConfig, project) - case "S3RegistryStore": - return NewS3RegistryStore(registryConfig, repoPath), nil case "GrpcRegistryStore": return NewGrpcRegistryStore(registryConfig, project) + case "FileRegistryStore": + return NewFileRegistryStore(registryConfig, repoPath), nil + case "S3RegistryStore": + return NewS3RegistryStore(registryConfig, repoPath), nil } return nil, errors.GrpcInternalErrorf("only FileRegistryStore or HttpRegistryStore as a RegistryStore is supported at this moment") } From 9b7e35183d9562f444df53b2d1a047fd4c1b845d Mon Sep 17 00:00:00 2001 From: vineet-belur Date: Mon, 12 Jan 2026 10:52:14 -0600 Subject: [PATCH 3/4] fix: Add correct method names --- go/internal/feast/registry/registry.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index f4d9e799757..f4f935a88b6 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -211,11 +211,11 @@ func (r *Registry) RefreshRegistryOnInterval() { func (r *Registry) refresh() error { if r.registryStore.HasFallback() { - r.cachedEntities.expireCachedModels(r.GetEntityFromRegistry) - r.cachedFeatureServices.expireCachedModels(r.GetFeatureServiceFromRegistry) - r.cachedFeatureViews.expireCachedModels(r.GetFeatureViewFromRegistry) - r.cachedSortedFeatureViews.expireCachedModels(r.GetSortedFeatureViewFromRegistry) - r.cachedOnDemandFeatureViews.expireCachedModels(r.GetOnDemandFeatureViewFromRegistry) + r.cachedEntities.expireCachedModels(r.GetEntity) + r.cachedFeatureServices.expireCachedModels(r.GetFeatureService) + r.cachedFeatureViews.expireCachedModels(r.GetFeatureView) + r.cachedSortedFeatureViews.expireCachedModels(r.GetSortedFeatureView) + r.cachedOnDemandFeatureViews.expireCachedModels(r.GetOnDemandFeatureView) } else { registryProto, err := r.registryStore.GetRegistryProto() if err != nil { @@ -265,7 +265,7 @@ func loadModels[U any, T any](protoList []U, cachedModels *cacheMap[T], modelFac } } -func (r *Registry) GetEntityFromRegistry(entityName string, project string) (*model.Entity, error) { +func (r *Registry) GetEntity(entityName string, project string) (*model.Entity, error) { remoteStore, ok := r.registryStore.(RemoteRegistryStore) if !ok { return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") @@ -284,7 +284,7 @@ func (r *Registry) GetEntityFromRegistry(entityName string, project string) (*mo return model.NewEntityFromProto(entityProto), nil } -func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project string) (*model.FeatureView, error) { +func (r *Registry) GetFeatureView(featureViewName string, project string) (*model.FeatureView, error) { remoteStore, ok := r.registryStore.(RemoteRegistryStore) if !ok { return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") @@ -303,7 +303,7 @@ func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project st return model.NewFeatureViewFromProto(featureViewProto), nil } -func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { +func (r *Registry) GetSortedFeatureView(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { remoteStore, ok := r.registryStore.(RemoteRegistryStore) if !ok { return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") @@ -322,7 +322,7 @@ func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string return model.NewSortedFeatureViewFromProto(sortedFeatureViewProto), nil } -func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, project string) (*model.FeatureService, error) { +func (r *Registry) GetFeatureService(featureServiceName string, project string) (*model.FeatureService, error) { remoteStore, ok := r.registryStore.(RemoteRegistryStore) if !ok { return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") @@ -341,7 +341,7 @@ func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, proj return model.NewFeatureServiceFromProto(featureServiceProto), nil } -func (r *Registry) GetOnDemandFeatureViewFromRegistry(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { +func (r *Registry) GetOnDemandFeatureView(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { remoteStore, ok := r.registryStore.(RemoteRegistryStore) if !ok { return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") From e8620fe09668ee8be5d7ec6c2dd89ff342e41793 Mon Sep 17 00:00:00 2001 From: vineet-belur Date: Mon, 12 Jan 2026 11:45:42 -0600 Subject: [PATCH 4/4] fix: parity with get method names --- go/internal/feast/registry/grpc.go | 10 +++++----- go/internal/feast/registry/http.go | 10 +++++----- go/internal/feast/registry/registry.go | 10 +++++----- go/internal/feast/registry/registrystore.go | 10 +++++----- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/go/internal/feast/registry/grpc.go b/go/internal/feast/registry/grpc.go index 860aae2196a..93afb3f4e32 100644 --- a/go/internal/feast/registry/grpc.go +++ b/go/internal/feast/registry/grpc.go @@ -80,7 +80,7 @@ func (grs *GrpcRegistryStore) withMetadata(ctx context.Context) context.Context return ctx } -func (grs *GrpcRegistryStore) GetEntity(name string, allowCache bool) (*core.Entity, error) { +func (grs *GrpcRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) { ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) defer cancel() @@ -100,7 +100,7 @@ func (grs *GrpcRegistryStore) GetEntity(name string, allowCache bool) (*core.Ent return entity, nil } -func (grs *GrpcRegistryStore) GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) { +func (grs *GrpcRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) { ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) defer cancel() @@ -120,7 +120,7 @@ func (grs *GrpcRegistryStore) GetFeatureView(name string, allowCache bool) (*cor return fv, nil } -func (grs *GrpcRegistryStore) GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { +func (grs *GrpcRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) defer cancel() @@ -140,7 +140,7 @@ func (grs *GrpcRegistryStore) GetSortedFeatureView(name string, allowCache bool) return sfv, nil } -func (grs *GrpcRegistryStore) GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) { +func (grs *GrpcRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) { ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) defer cancel() @@ -160,7 +160,7 @@ func (grs *GrpcRegistryStore) GetFeatureService(name string, allowCache bool) (* return fs, nil } -func (grs *GrpcRegistryStore) GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { +func (grs *GrpcRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) defer cancel() diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index 2c71853b19c..c4554f39cbd 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -108,7 +108,7 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu return nil } -func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) { +func (r *HttpRegistryStore) GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) { url := fmt.Sprintf("%s/projects/%s/feature_services/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) featureService := &core.FeatureService{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -124,7 +124,7 @@ func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*co return featureService, nil } -func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) { +func (r *HttpRegistryStore) GetEntity(name string, allowCache bool) (*core.Entity, error) { url := fmt.Sprintf("%s/projects/%s/entities/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) entity := &core.Entity{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -141,7 +141,7 @@ func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entit return entity, nil } -func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) { +func (r *HttpRegistryStore) GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) { url := fmt.Sprintf("%s/projects/%s/feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) featureView := &core.FeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -157,7 +157,7 @@ func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core. return featureView, nil } -func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { +func (r *HttpRegistryStore) GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { url := fmt.Sprintf("%s/projects/%s/on_demand_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) onDemandFeatureView := &core.OnDemandFeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -173,7 +173,7 @@ func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool) return onDemandFeatureView, nil } -func (r *HttpRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { +func (r *HttpRegistryStore) GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { url := fmt.Sprintf("%s/projects/%s/sorted_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) sortedFeatureView := &core.SortedFeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index f4f935a88b6..fac38d61dd0 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -271,7 +271,7 @@ func (r *Registry) GetEntity(entityName string, project string) (*model.Entity, return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - entityProto, err := remoteStore.GetEntity(entityName, true) + entityProto, err := remoteStore.getEntity(entityName, true) if err != nil { if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no entity %s found in project %s", entityName, project) @@ -290,7 +290,7 @@ func (r *Registry) GetFeatureView(featureViewName string, project string) (*mode return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - featureViewProto, err := remoteStore.GetFeatureView(featureViewName, true) + featureViewProto, err := remoteStore.getFeatureView(featureViewName, true) if err != nil { if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature view %s found in project %s", featureViewName, project) @@ -309,7 +309,7 @@ func (r *Registry) GetSortedFeatureView(sortedFeatureViewName string, project st return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - sortedFeatureViewProto, err := remoteStore.GetSortedFeatureView(sortedFeatureViewName, true) + sortedFeatureViewProto, err := remoteStore.getSortedFeatureView(sortedFeatureViewName, true) if err != nil { if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no sorted feature view %s found in project %s", sortedFeatureViewName, project) @@ -328,7 +328,7 @@ func (r *Registry) GetFeatureService(featureServiceName string, project string) return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - featureServiceProto, err := remoteStore.GetFeatureService(featureServiceName, true) + featureServiceProto, err := remoteStore.getFeatureService(featureServiceName, true) if err != nil { if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature service %s found in project %s", featureServiceName, project) @@ -347,7 +347,7 @@ func (r *Registry) GetOnDemandFeatureView(onDemandFeatureViewName string, projec return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - onDemandFeatureViewProto, err := remoteStore.GetOnDemandFeatureView(onDemandFeatureViewName, true) + onDemandFeatureViewProto, err := remoteStore.getOnDemandFeatureView(onDemandFeatureViewName, true) if err != nil { if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no on demand feature view %s found in project %s", onDemandFeatureViewName, project) diff --git a/go/internal/feast/registry/registrystore.go b/go/internal/feast/registry/registrystore.go index dc9105e3381..7ff45969d41 100644 --- a/go/internal/feast/registry/registrystore.go +++ b/go/internal/feast/registry/registrystore.go @@ -15,9 +15,9 @@ type RegistryStore interface { type RemoteRegistryStore interface { RegistryStore // Add base interface for composition. - GetEntity(name string, allowCache bool) (*core.Entity, error) - GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) - GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) - GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) - GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) + getEntity(name string, allowCache bool) (*core.Entity, error) + getFeatureView(name string, allowCache bool) (*core.FeatureView, error) + getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) + getFeatureService(name string, allowCache bool) (*core.FeatureService, error) + getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) }