diff --git a/go/internal/feast/registry/grpc.go b/go/internal/feast/registry/grpc.go new file mode 100644 index 00000000000..93afb3f4e32 --- /dev/null +++ b/go/internal/feast/registry/grpc.go @@ -0,0 +1,202 @@ +package registry + +import ( + "context" + "fmt" + "time" + + "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/feast-dev/feast/go/protos/feast/registry" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type GrpcRegistryStore struct { + project string + endpoint string + clientId string + conn *grpc.ClientConn + client registry.RegistryServerClient + timeout time.Duration +} + +func NewGrpcRegistryStore(config *RegistryConfig, project string) (*GrpcRegistryStore, error) { + log.Info().Msgf("Using gRPC Registry: %s", config.Path) + + conn, err := grpc.NewClient( + config.Path, + grpc.WithBlock(), + grpc.WithTimeout(5*time.Second), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to gRPC registry: %w", err) + } + + client := registry.NewRegistryServerClient(conn) + + grs := &GrpcRegistryStore{ + project: project, + endpoint: config.Path, + clientId: config.ClientId, + conn: conn, + client: client, + timeout: 5 * time.Second, + } + + if err := grs.TestConnectivity(); err != nil { + conn.Close() + return nil, err + } + + return grs, nil +} + +func (grs *GrpcRegistryStore) TestConnectivity() error { + // May consider removing this, however added for feature parity with http_server, however no obvious healthcheck exists + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.ListEntitiesRequest{ + Project: grs.project, + AllowCache: false, + } + + _, err := grs.client.ListEntities(ctx, req) + if err != nil { + return fmt.Errorf("gRPC registry connectivity check failed: %w", err) + } + + return nil +} + +func (grs *GrpcRegistryStore) withMetadata(ctx context.Context) context.Context { + if grs.clientId != "" { + md := metadata.Pairs("client-id", grs.clientId) + ctx = metadata.NewOutgoingContext(ctx, md) + } + return ctx +} + +func (grs *GrpcRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetEntityRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + entity, err := grs.client.GetEntity(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get entity %s: %w", name, err) + } + + return entity, nil +} + +func (grs *GrpcRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + fv, err := grs.client.GetFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get feature view %s: %w", name, err) + } + + return fv, nil +} + +func (grs *GrpcRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetSortedFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + sfv, err := grs.client.GetSortedFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get sorted feature view %s: %w", name, err) + } + + return sfv, nil +} + +func (grs *GrpcRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetFeatureServiceRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + fs, err := grs.client.GetFeatureService(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get feature service %s: %w", name, err) + } + + return fs, nil +} + +func (grs *GrpcRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { + ctx, cancel := context.WithTimeout(context.Background(), grs.timeout) + defer cancel() + + ctx = grs.withMetadata(ctx) + + req := ®istry.GetOnDemandFeatureViewRequest{ + Name: name, + Project: grs.project, + AllowCache: allowCache, + } + + odfv, err := grs.client.GetOnDemandFeatureView(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get on-demand feature view %s: %w", name, err) + } + + return odfv, nil +} + +func (grs *GrpcRegistryStore) GetRegistryProto() (*core.Registry, error) { + // gRPC stores use fallback mode, so this returns empty registry + registry := core.Registry{} + return ®istry, nil +} + +func (grs *GrpcRegistryStore) UpdateRegistryProto(rp *core.Registry) error { + return &NotImplementedError{FunctionName: "UpdateRegistryProto"} +} + +func (grs *GrpcRegistryStore) Teardown() error { + if grs.conn != nil { + return grs.conn.Close() + } + return nil +} + +func (grs *GrpcRegistryStore) HasFallback() bool { + return true +} diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index b6474e48dcc..c4554f39cbd 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -8,6 +8,7 @@ import ( "time" "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) @@ -107,7 +108,7 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu return nil } -func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*core.FeatureService, error) { +func (r *HttpRegistryStore) GetFeatureService(name string, allowCache bool) (*core.FeatureService, error) { url := fmt.Sprintf("%s/projects/%s/feature_services/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) featureService := &core.FeatureService{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -123,7 +124,7 @@ func (r *HttpRegistryStore) getFeatureService(name string, allowCache bool) (*co return featureService, nil } -func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entity, error) { +func (r *HttpRegistryStore) GetEntity(name string, allowCache bool) (*core.Entity, error) { url := fmt.Sprintf("%s/projects/%s/entities/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) entity := &core.Entity{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -140,7 +141,7 @@ func (r *HttpRegistryStore) getEntity(name string, allowCache bool) (*core.Entit return entity, nil } -func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core.FeatureView, error) { +func (r *HttpRegistryStore) GetFeatureView(name string, allowCache bool) (*core.FeatureView, error) { url := fmt.Sprintf("%s/projects/%s/feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) featureView := &core.FeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -156,7 +157,7 @@ func (r *HttpRegistryStore) getFeatureView(name string, allowCache bool) (*core. return featureView, nil } -func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { +func (r *HttpRegistryStore) GetOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) { url := fmt.Sprintf("%s/projects/%s/on_demand_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) onDemandFeatureView := &core.OnDemandFeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { @@ -172,7 +173,7 @@ func (r *HttpRegistryStore) getOnDemandFeatureView(name string, allowCache bool) return onDemandFeatureView, nil } -func (r *HttpRegistryStore) getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { +func (r *HttpRegistryStore) GetSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) { url := fmt.Sprintf("%s/projects/%s/sorted_feature_views/%s?allow_cache=%t", r.endpoint, r.project, name, allowCache) sortedFeatureView := &core.SortedFeatureView{} err := r.loadProtobufMessages(url, func(data []byte) error { diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index a5a23e85976..fac38d61dd0 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -21,6 +21,7 @@ var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{ "file": "FileRegistryStore", "http": "HttpRegistryStore", "https": "HttpRegistryStore", + "grpc": "GrpcRegistryStore", "": "FileRegistryStore", } @@ -182,7 +183,7 @@ func (r *Registry) InitializeRegistry() error { registryProto, err := r.registryStore.GetRegistryProto() if err != nil { switch r.registryStore.(type) { - case *FileRegistryStore, *HttpRegistryStore: + case *FileRegistryStore, *HttpRegistryStore, *GrpcRegistryStore: // TODO: Extend to support remote registry log.Error().Err(err).Msg("Registry Initialization Failed") return err default: @@ -210,11 +211,11 @@ func (r *Registry) RefreshRegistryOnInterval() { func (r *Registry) refresh() error { if r.registryStore.HasFallback() { - r.cachedEntities.expireCachedModels(r.GetEntityFromRegistry) - r.cachedFeatureServices.expireCachedModels(r.GetFeatureServiceFromRegistry) - r.cachedFeatureViews.expireCachedModels(r.GetFeatureViewFromRegistry) - r.cachedSortedFeatureViews.expireCachedModels(r.GetSortedFeatureViewFromRegistry) - r.cachedOnDemandFeatureViews.expireCachedModels(r.GetOnDemandFeatureViewFromRegistry) + r.cachedEntities.expireCachedModels(r.GetEntity) + r.cachedFeatureServices.expireCachedModels(r.GetFeatureService) + r.cachedFeatureViews.expireCachedModels(r.GetFeatureView) + r.cachedSortedFeatureViews.expireCachedModels(r.GetSortedFeatureView) + r.cachedOnDemandFeatureViews.expireCachedModels(r.GetOnDemandFeatureView) } else { registryProto, err := r.registryStore.GetRegistryProto() if err != nil { @@ -264,53 +265,37 @@ func loadModels[U any, T any](protoList []U, cachedModels *cacheMap[T], modelFac } } -func (r *Registry) GetEntity(project string, entityName string) (*model.Entity, error) { - if r.registryStore.HasFallback() { - return r.cachedEntities.getOrLoad(project, entityName, r.GetEntityFromRegistry) - } - - if entity, ok := r.cachedEntities.get(project, entityName); ok { - return entity, nil +func (r *Registry) GetEntity(entityName string, project string) (*model.Entity, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached entity %s found for project %s", entityName, project) -} - -func (r *Registry) GetEntityFromRegistry(entityName string, project string) (*model.Entity, error) { - entityProto, err := r.registryStore.(*HttpRegistryStore).getEntity(entityName, true) + entityProto, err := remoteStore.getEntity(entityName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no entity %s found in project %s", entityName, project) return nil, errors.GrpcNotFoundErrorf("no entity %s found in project %s", entityName, project) } - - log.Error().Err(err).Msgf("no entity %s found in project %s", entityName, project) + log.Error().Err(err).Msgf("error retrieving entity %s in project %s", entityName, project) return nil, errors.GrpcInternalErrorf("error retrieving entity %s in project %s: %v", entityName, project, err) } return model.NewEntityFromProto(entityProto), nil } -func (r *Registry) GetFeatureView(project string, featureViewName string) (*model.FeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedFeatureViews.getOrLoad(project, featureViewName, r.GetFeatureViewFromRegistry) - } - - if cachedFeatureView, ok := r.cachedFeatureViews.get(project, featureViewName); ok { - return cachedFeatureView, nil +func (r *Registry) GetFeatureView(featureViewName string, project string) (*model.FeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached feature view %s found for project %s", featureViewName, project) -} - -func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project string) (*model.FeatureView, error) { - featureViewProto, err := r.registryStore.(*HttpRegistryStore).getFeatureView(featureViewName, true) + featureViewProto, err := remoteStore.getFeatureView(featureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature view %s found in project %s", featureViewName, project) return nil, errors.GrpcNotFoundErrorf("no feature view %s found in project %s", featureViewName, project) } - log.Error().Err(err).Msgf("error retrieving feature view %s in project %s", featureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving feature view %s in project %s: %v", featureViewName, project, err) } @@ -318,26 +303,18 @@ func (r *Registry) GetFeatureViewFromRegistry(featureViewName string, project st return model.NewFeatureViewFromProto(featureViewProto), nil } -func (r *Registry) GetSortedFeatureView(project string, sortedFeatureViewName string) (*model.SortedFeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedSortedFeatureViews.getOrLoad(project, sortedFeatureViewName, r.GetSortedFeatureViewFromRegistry) - } - - if cachedSortedFeatureView, ok := r.cachedSortedFeatureViews.get(project, sortedFeatureViewName); ok { - return cachedSortedFeatureView, nil +func (r *Registry) GetSortedFeatureView(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - return nil, errors.GrpcNotFoundErrorf("no cached sorted feature view %s found for project %s", sortedFeatureViewName, project) -} - -func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string, project string) (*model.SortedFeatureView, error) { - sortedFeatureViewProto, err := r.registryStore.(*HttpRegistryStore).getSortedFeatureView(sortedFeatureViewName, true) + sortedFeatureViewProto, err := remoteStore.getSortedFeatureView(sortedFeatureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no sorted feature view %s found in project %s", sortedFeatureViewName, project) return nil, errors.GrpcNotFoundErrorf("no sorted feature view %s found in project %s", sortedFeatureViewName, project) } - log.Error().Err(err).Msgf("error retrieving sorted feature view %s in project %s", sortedFeatureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving sorted feature view %s in project %s: %v", sortedFeatureViewName, project, err) } @@ -345,26 +322,18 @@ func (r *Registry) GetSortedFeatureViewFromRegistry(sortedFeatureViewName string return model.NewSortedFeatureViewFromProto(sortedFeatureViewProto), nil } -func (r *Registry) GetFeatureService(project string, featureServiceName string) (*model.FeatureService, error) { - if r.registryStore.HasFallback() { - return r.cachedFeatureServices.getOrLoad(project, featureServiceName, r.GetFeatureServiceFromRegistry) +func (r *Registry) GetFeatureService(featureServiceName string, project string) (*model.FeatureService, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - if cachedFeatureService, ok := r.cachedFeatureServices.get(project, featureServiceName); ok { - return cachedFeatureService, nil - } - - return nil, errors.GrpcNotFoundErrorf("no cached feature service %s found for project %s", featureServiceName, project) -} - -func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, project string) (*model.FeatureService, error) { - featureServiceProto, err := r.registryStore.(*HttpRegistryStore).getFeatureService(featureServiceName, true) + featureServiceProto, err := remoteStore.getFeatureService(featureServiceName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no feature service %s found in project %s", featureServiceName, project) return nil, errors.GrpcNotFoundErrorf("no feature service %s found in project %s", featureServiceName, project) } - log.Error().Err(err).Msgf("error retrieving feature service %s in project %s", featureServiceName, project) return nil, errors.GrpcInternalErrorf("error retrieving feature service %s in project %s: %v", featureServiceName, project, err) } @@ -372,26 +341,18 @@ func (r *Registry) GetFeatureServiceFromRegistry(featureServiceName string, proj return model.NewFeatureServiceFromProto(featureServiceProto), nil } -func (r *Registry) GetOnDemandFeatureView(project string, onDemandFeatureViewName string) (*model.OnDemandFeatureView, error) { - if r.registryStore.HasFallback() { - return r.cachedOnDemandFeatureViews.getOrLoad(project, onDemandFeatureViewName, r.GetOnDemandFeatureViewFromRegistry) +func (r *Registry) GetOnDemandFeatureView(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { + remoteStore, ok := r.registryStore.(RemoteRegistryStore) + if !ok { + return nil, errors.GrpcInternalErrorf("registry store does not support remote operations") } - if cachedOnDemandFeatureView, ok := r.cachedOnDemandFeatureViews.get(project, onDemandFeatureViewName); ok { - return cachedOnDemandFeatureView, nil - } - - return nil, errors.GrpcNotFoundErrorf("no cached on demand feature view %s found for project %s", onDemandFeatureViewName, project) -} - -func (r *Registry) GetOnDemandFeatureViewFromRegistry(onDemandFeatureViewName string, project string) (*model.OnDemandFeatureView, error) { - onDemandFeatureViewProto, err := r.registryStore.(*HttpRegistryStore).getOnDemandFeatureView(onDemandFeatureViewName, true) + onDemandFeatureViewProto, err := remoteStore.getOnDemandFeatureView(onDemandFeatureViewName, true) if err != nil { - if errors.IsHTTPNotFoundError(err) { + if errors.IsHTTPNotFoundError(err) || errors.IsGrpcNotFoundError(err) { log.Error().Err(err).Msgf("no on demand feature view %s found in project %s", onDemandFeatureViewName, project) return nil, errors.GrpcNotFoundErrorf("no on demand feature view %s found in project %s", onDemandFeatureViewName, project) } - log.Error().Err(err).Msgf("error retrieving on demand feature view %s in project %s", onDemandFeatureViewName, project) return nil, errors.GrpcInternalErrorf("error retrieving on demand feature view %s in project %s: %v", onDemandFeatureViewName, project, err) } @@ -412,10 +373,12 @@ func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryCon func getRegistryStoreFromType(registryStoreType string, registryConfig *RegistryConfig, repoPath string, project string) (RegistryStore, error) { switch registryStoreType { - case "FileRegistryStore": - return NewFileRegistryStore(registryConfig, repoPath), nil case "HttpRegistryStore": return NewHttpRegistryStore(registryConfig, project) + case "GrpcRegistryStore": + return NewGrpcRegistryStore(registryConfig, project) + case "FileRegistryStore": + return NewFileRegistryStore(registryConfig, repoPath), nil case "S3RegistryStore": return NewS3RegistryStore(registryConfig, repoPath), nil } diff --git a/go/internal/feast/registry/registrystore.go b/go/internal/feast/registry/registrystore.go index c9c84508b7c..7ff45969d41 100644 --- a/go/internal/feast/registry/registrystore.go +++ b/go/internal/feast/registry/registrystore.go @@ -11,3 +11,13 @@ type RegistryStore interface { Teardown() error HasFallback() bool } + +type RemoteRegistryStore interface { + RegistryStore // Add base interface for composition. + + getEntity(name string, allowCache bool) (*core.Entity, error) + getFeatureView(name string, allowCache bool) (*core.FeatureView, error) + getSortedFeatureView(name string, allowCache bool) (*core.SortedFeatureView, error) + getFeatureService(name string, allowCache bool) (*core.FeatureService, error) + getOnDemandFeatureView(name string, allowCache bool) (*core.OnDemandFeatureView, error) +}