From 65a52ff6fc4146cafece22c5e144ae2725d3a5df Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Fri, 26 Jun 2026 13:35:05 -0500 Subject: [PATCH 1/3] fix: use config-driven entity key serialization version in Cassandra and Valkey stores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cassandra was hardcoding serialization version 2, which silently diverged from the Python default (v3) and from the Valkey store. Both stores now resolve the version from feature_store.yaml (entity_key_serialization_version, defaulting to v3 when unset), matching Python's SerializeEntityKey behavior. Also adds a shared warnPotentialEntityKeyVersionMismatch helper (extracted into entitykeyserialization.go) that fires a single deduplicated warning when an online read returns zero data for all requested feature views — a possible indicator of a write/read version mismatch. Covered by new unit tests in both store test files. Co-Authored-By: Claude Sonnet 4.6 --- .../feast/onlinestore/cassandraonlinestore.go | 66 ++++++++- .../onlinestore/cassandraonlinestore_test.go | 127 ++++++++++++++++++ .../feast/onlinestore/egvalkeyonlinestore.go | 57 ++++++++ .../onlinestore/egvalkeyonlinestore_test.go | 56 ++++++++ .../onlinestore/entitykeyserialization.go | 53 ++++++++ 5 files changed, 357 insertions(+), 2 deletions(-) create mode 100644 go/internal/feast/onlinestore/entitykeyserialization.go diff --git a/go/internal/feast/onlinestore/cassandraonlinestore.go b/go/internal/feast/onlinestore/cassandraonlinestore.go index 09f3ddac4af..451f8c4ce46 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore.go @@ -11,6 +11,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/feast-dev/feast/go/internal/feast/model" @@ -45,6 +46,8 @@ type CassandraOnlineStore struct { // Caches table names instead of generating the table name every time tableNameCache sync.Map + + versionMismatchWarned sync.Map } type CassandraConfig struct { @@ -393,7 +396,7 @@ func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.Enti cassandraKeys := make([]any, len(entityKeys)) cassandraKeyToEntityIndex := make(map[string]int) for i := 0; i < len(entityKeys); i++ { - var key, err = utils.SerializeEntityKey(entityKeys[i], 2) + var key, err = utils.SerializeEntityKey(entityKeys[i], c.resolvedEntityKeySerializationVersion()) if err != nil { return nil, nil, err } @@ -404,6 +407,15 @@ func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.Enti return cassandraKeys, cassandraKeyToEntityIndex, nil } +func (c *CassandraOnlineStore) resolvedEntityKeySerializationVersion() int64 { + return resolveEntityKeySerializationVersion(c.config) +} + +func (c *CassandraOnlineStore) warnPotentialVersionMismatch(views []string, numKeys int) { + warnPotentialEntityKeyVersionMismatch( + &c.versionMismatchWarned, "Cassandra", c.resolvedEntityKeySerializationVersion(), views, numKeys) +} + func (c *CassandraOnlineStore) validateUniqueFeatureNames(featureViewNames []string) error { uniqueNames := make(map[string]int32) for _, fvName := range featureViewNames { @@ -499,7 +511,9 @@ func (c *CassandraOnlineStore) executeBatchV2( var deserializedValue *types.Value batchFeatures := make(map[string]map[string]*FeatureData) + rowsScanned := 0 for scanner.Next() { + rowsScanned++ err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) if err != nil { return nil, fmt.Errorf("could not read row in query for (entity key, feature name, value, event ts): %w", err) @@ -530,6 +544,15 @@ func (c *CassandraOnlineStore) executeBatchV2( return nil, fmt.Errorf("failed to scan features: %w", err) } + // OnlineReadV2 issues a single batch that covers this feature view's full set of entity + // keys, so rowsScanned == 0 here means the entire request missed for the only view + // involved. That is exactly the "all requested views missed" condition, so warning + // directly from this single-batch path is safe (unlike the V1 OnlineRead path, which + // splits keys across batches and must aggregate results before deciding). + if rowsScanned == 0 { + c.warnPotentialVersionMismatch([]string{job.ViewName}, len(job.EntityKeys)) + } + for _, serializedEntityKey := range job.EntityKeys { for _, featName := range job.FeatureNames { keyString := serializedEntityKey.(string) @@ -591,6 +614,10 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ batches := c.createBatches(serializedEntityKeys) + // viewsWithData records which feature views had at least one row returned by Cassandra. + // Used after all batches complete to detect per-view complete misses (possible version mismatch). + viewsWithData := &sync.Map{} + g.Go(func() error { defer close(jobsChan) @@ -635,7 +662,7 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ for job := range jobsChan { g.Go(func(j BatchJob) func() error { return func() error { - return c.executeBatch(ctx, j, serializedEntityKeyToIndex, results, featureNamesToIdx) + return c.executeBatch(ctx, j, serializedEntityKeyToIndex, results, featureNamesToIdx, viewsWithData) } }(job)) } @@ -644,6 +671,20 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ return nil, err } + // After all batches: only warn if EVERY requested feature view returned zero rows. A + // single view missing is normal (sparse/cold entities); a complete miss across the whole + // request is a stronger — though still not conclusive — signal of a possible + // serialization version mismatch. Warn once (deduped) for the full set of views. + missedViews := make([]string, 0, len(viewGroups)) + for viewName := range viewGroups { + if _, hasData := viewsWithData.Load(viewName); !hasData { + missedViews = append(missedViews, viewName) + } + } + if len(viewGroups) > 0 && len(missedViews) == len(viewGroups) { + c.warnPotentialVersionMismatch(missedViews, len(entityKeys)) + } + return results, nil } @@ -653,6 +694,7 @@ func (c *CassandraOnlineStore) executeBatch( serializedEntityKeyToIndex map[string]int, results [][]FeatureData, featureNamesToIdx map[string]int, + viewsWithData *sync.Map, ) error { iter := c.session.Query(job.CQLStatement, job.EntityKeys...).WithContext(ctx).Iter() defer iter.Close() @@ -665,7 +707,9 @@ func (c *CassandraOnlineStore) executeBatch( var deserializedValue *types.Value batchFeatures := make(map[string]map[string]*FeatureData) + rowsScanned := 0 for scanner.Next() { + rowsScanned++ err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr) if err != nil { return fmt.Errorf("could not read row in query for (entity key, feature name, value, event ts): %w", err) @@ -696,6 +740,12 @@ func (c *CassandraOnlineStore) executeBatch( return fmt.Errorf("failed to scan features: %w", err) } + // Mark this feature view as having returned data so OnlineRead can determine + // per-view whether a complete miss occurred across all batches. + if rowsScanned > 0 && viewsWithData != nil { + viewsWithData.Store(job.ViewName, struct{}{}) + } + for _, serializedEntityKey := range job.EntityKeys { for _, featName := range job.FeatureNames { keyString := serializedEntityKey.(string) @@ -914,6 +964,11 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs var waitGroup sync.WaitGroup errorsChannel := make(chan error, nBatches) + // sawAnyRow is set to true by the first goroutine that returns at least one row. + // If it remains false after all batches finish, every entity key got a complete miss — + // a possible indicator of a serialization version mismatch. + var sawAnyRow atomic.Bool + canonicalFeats := make([]string, len(groupedRefs.FeatureNames)) isSortKey := make([]bool, len(groupedRefs.FeatureNames)) for i, name := range groupedRefs.FeatureNames { @@ -950,6 +1005,7 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs continue } + sawAnyRow.Store(true) rowData := results[rowIdx] for i, featName := range groupedRefs.FeatureNames { @@ -1011,6 +1067,12 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs return nil, errors.Join(allErrors...) } + // OnlineReadRange handles a single feature view, so a complete miss across all batches is + // the "all requested views missed" condition. Warn once if this looks like a version mismatch. + if !sawAnyRow.Load() { + c.warnPotentialVersionMismatch([]string{prepCtx.featureViewName}, len(groupedRefs.EntityKeys)) + } + return results, nil } diff --git a/go/internal/feast/onlinestore/cassandraonlinestore_test.go b/go/internal/feast/onlinestore/cassandraonlinestore_test.go index f9f4998666f..89c3722d6f1 100644 --- a/go/internal/feast/onlinestore/cassandraonlinestore_test.go +++ b/go/internal/feast/onlinestore/cassandraonlinestore_test.go @@ -3,16 +3,20 @@ package onlinestore import ( + "encoding/hex" "fmt" "reflect" "testing" "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/internal/feast/utils" "github.com/feast-dev/feast/go/protos/feast/core" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/gocql/gocql" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" ) @@ -539,3 +543,126 @@ func TestResolveFeatureValue_AlreadyLowercaseFeature(t *testing.T) { assert.Equal(t, serving.FieldStatus_PRESENT, status) assert.Equal(t, "hello", val.(*types.Value).GetStringVal()) } + +func sampleEntityKey() *types.EntityKey { + return &types.EntityKey{ + JoinKeys: []string{"user_id"}, + EntityValues: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 42}}, + }, + } +} + +func serializedHex(t *testing.T, key *types.EntityKey, version int64) string { + t.Helper() + b, err := utils.SerializeEntityKey(key, version) + require.NoError(t, err) + return hex.EncodeToString(*b) +} + +func TestBuildCassandraEntityKeys_UsesConfiguredVersion(t *testing.T) { + key := sampleEntityKey() + + for _, tc := range []struct { + name string + version int64 + }{ + {"version2", 2}, + {"version3", 3}, + } { + t.Run(tc.name, func(t *testing.T) { + store := &CassandraOnlineStore{ + config: ®istry.RepoConfig{ + EntityKeySerializationVersion: tc.version, + }, + } + + cassandraKeys, keyToIdx, err := store.buildCassandraEntityKeys([]*types.EntityKey{key}) + require.NoError(t, err) + require.Len(t, cassandraKeys, 1) + + gotHex := cassandraKeys[0].(string) + wantHex := serializedHex(t, key, tc.version) + + assert.Equal(t, wantHex, gotHex, + "hex key for version %d should match utils.SerializeEntityKey output", tc.version) + assert.Equal(t, 0, keyToIdx[gotHex]) + }) + } + + storeV2 := &CassandraOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 2}} + storeV3 := &CassandraOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}} + keysV2, _, _ := storeV2.buildCassandraEntityKeys([]*types.EntityKey{key}) + keysV3, _, _ := storeV3.buildCassandraEntityKeys([]*types.EntityKey{key}) + assert.NotEqual(t, keysV2[0], keysV3[0], + "v2 and v3 serialized keys must differ for the same entity key") +} + +func TestBuildCassandraEntityKeys_ZeroVersionDefaultsToV3(t *testing.T) { + key := sampleEntityKey() + + store := &CassandraOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 0}, + } + keys, _, err := store.buildCassandraEntityKeys([]*types.EntityKey{key}) + require.NoError(t, err) + + wantHex := serializedHex(t, key, 3) + assert.Equal(t, wantHex, keys[0].(string), "zero version should produce v3 keys") +} + +func TestResolvedEntityKeySerializationVersion(t *testing.T) { + assert.Equal(t, int64(3), (&CassandraOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 0}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(2), (&CassandraOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 2}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(3), (&CassandraOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(3), (&CassandraOnlineStore{config: nil}).resolvedEntityKeySerializationVersion(), "nil config should default to v3 without panicking") +} + +func TestWarnPotentialVersionMismatch_DeduplicatesPerRequest(t *testing.T) { + store := &CassandraOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + const view = "my_feature_view" + + store.warnPotentialVersionMismatch([]string{view}, 5) + _, warned := store.versionMismatchWarned.Load(view) + assert.True(t, warned, "view should be recorded after first call") + + store.warnPotentialVersionMismatch([]string{view}, 5) + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 1, count, "only one entry should exist for the view after repeated calls") + + const otherView = "other_feature_view" + store.warnPotentialVersionMismatch([]string{otherView}, 3) + count = 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 2, count, "each distinct request should have its own warning entry") +} + +func TestWarnPotentialVersionMismatch_MultiViewDedupIsOrderIndependent(t *testing.T) { + store := &CassandraOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + store.warnPotentialVersionMismatch([]string{"view_a", "view_b"}, 4) + store.warnPotentialVersionMismatch([]string{"view_b", "view_a"}, 4) + + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 1, count, "same set of views in any order should dedupe to one entry") +} + +func TestWarnPotentialVersionMismatch_EmptyViewsIsNoop(t *testing.T) { + store := &CassandraOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + store.warnPotentialVersionMismatch(nil, 5) + store.warnPotentialVersionMismatch([]string{}, 5) + + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 0, count, "no warning entry should be recorded for an empty view set") +} diff --git a/go/internal/feast/onlinestore/egvalkeyonlinestore.go b/go/internal/feast/onlinestore/egvalkeyonlinestore.go index a97a4fac3a2..aabed047ccc 100644 --- a/go/internal/feast/onlinestore/egvalkeyonlinestore.go +++ b/go/internal/feast/onlinestore/egvalkeyonlinestore.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/utils" @@ -53,6 +54,10 @@ type ValkeyOnlineStore struct { // Number of keys to read in a batch ReadBatchSize int + + // Tracks request shapes for which a potential serialization-version-mismatch warning has + // already been emitted, so the warning fires at most once per shape per process lifetime. + versionMismatchWarned sync.Map } func parseConnectionString(onlineStoreConfig map[string]interface{}, valkeyStoreType valkeyType) (valkey.ClientOption, error) { @@ -257,6 +262,15 @@ func (v *ValkeyOnlineStore) buildValkeyKeys(entityKeys []*types.EntityKey) ([]*[ return valkeyKeys, nil } +func (v *ValkeyOnlineStore) resolvedEntityKeySerializationVersion() int64 { + return resolveEntityKeySerializationVersion(v.config) +} + +func (v *ValkeyOnlineStore) warnPotentialVersionMismatch(views []string, numKeys int) { + warnPotentialEntityKeyVersionMismatch( + &v.versionMismatchWarned, "Valkey", v.resolvedEntityKeySerializationVersion(), views, numKeys) +} + func (v *ValkeyOnlineStore) OnlineReadV2(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { return v.OnlineRead(ctx, entityKeys, featureViewNames, featureNames) } @@ -282,6 +296,9 @@ func (v *ValkeyOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. } var resContainsNonNil bool + // viewsWithData records which feature views returned at least one non-nil value across all + // entity keys. Used after the read completes to detect a complete miss (possible version mismatch). + viewsWithData := make(map[string]bool) for entityIndex, values := range v.client.DoMulti(ctx, cmds...) { if err := values.Error(); err != nil { @@ -324,6 +341,7 @@ func (v *ValkeyOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. if value, _, err = utils.UnmarshalStoredProto([]byte(valueString)); err != nil { return nil, errors.New("error converting parsed valkey Value to types.Value") } + viewsWithData[featureViewName] = true } if _, ok := timeStampMap[featureViewName]; !ok { @@ -353,6 +371,23 @@ func (v *ValkeyOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. } } + // Only warn if EVERY requested feature view returned zero data. A single sparse view is + // normal; a complete miss across the whole request is a stronger — though not conclusive — + // signal of a possible serialization version mismatch. Warn once (deduped) per request shape. + requestedViews := make(map[string]struct{}) + for _, viewName := range featureViewNames { + requestedViews[viewName] = struct{}{} + } + missedViews := make([]string, 0, len(requestedViews)) + for viewName := range requestedViews { + if !viewsWithData[viewName] { + missedViews = append(missedViews, viewName) + } + } + if len(requestedViews) > 0 && len(missedViews) == len(requestedViews) { + v.warnPotentialVersionMismatch(missedViews, len(entityKeys)) + } + return results, nil } @@ -550,6 +585,7 @@ func (v *ValkeyOnlineStore) processEntityKey( limit int64, results [][]RangeFeatureData, featNames, fvNames []string, + sawAnyRow *atomic.Bool, ) error { select { case <-ctx.Done(): @@ -650,6 +686,11 @@ func (v *ValkeyOnlineStore) processEntityKey( } continue } + // At least one stored member exists for this entity/feature view — record that the + // request returned data so OnlineReadRange can detect a complete miss across all keys. + if sawAnyRow != nil { + sawAnyRow.Store(true) + } // build list of hash fields to retrieve fields := append(append([]string{}, grp.FieldHashes...), grp.TsKey) if err := valkeyBatchHMGET( @@ -739,6 +780,11 @@ func (v *ValkeyOnlineStore) OnlineReadRange( results := make([][]RangeFeatureData, len(groupedRefs.EntityKeys)) + // sawAnyRow is set by the first entity goroutine that finds at least one stored member. + // If it remains false after all goroutines finish, every entity key got a complete miss — + // a possible (not guaranteed) indicator of a serialization version mismatch. + var sawAnyRow atomic.Bool + var wg sync.WaitGroup errChan := make(chan error, len(groupedRefs.EntityKeys)) @@ -757,6 +803,7 @@ func (v *ValkeyOnlineStore) OnlineReadRange( limit, results, featNames, fvNames, + &sawAnyRow, ); err != nil { errChan <- err } @@ -776,6 +823,16 @@ func (v *ValkeyOnlineStore) OnlineReadRange( if len(allErrors) > 0 { return nil, errors.Join(allErrors...) } + + // Complete miss across all entity keys — warn once if this looks like a version mismatch. + if !sawAnyRow.Load() { + requestedViews := make([]string, 0, len(fvGroups)) + for fv := range fvGroups { + requestedViews = append(requestedViews, fv) + } + v.warnPotentialVersionMismatch(requestedViews, len(groupedRefs.EntityKeys)) + } + return results, nil } diff --git a/go/internal/feast/onlinestore/egvalkeyonlinestore_test.go b/go/internal/feast/onlinestore/egvalkeyonlinestore_test.go index 125f39f810b..30a391545d6 100644 --- a/go/internal/feast/onlinestore/egvalkeyonlinestore_test.go +++ b/go/internal/feast/onlinestore/egvalkeyonlinestore_test.go @@ -355,3 +355,59 @@ func TestGetValkeyTraceServiceName(t *testing.T) { os.Unsetenv("DD_SERVICE") }) } + +func TestValkeyResolvedEntityKeySerializationVersion(t *testing.T) { + assert.Equal(t, int64(3), (&ValkeyOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 0}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(2), (&ValkeyOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 2}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(3), (&ValkeyOnlineStore{config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}}).resolvedEntityKeySerializationVersion()) + assert.Equal(t, int64(3), (&ValkeyOnlineStore{config: nil}).resolvedEntityKeySerializationVersion(), "nil config should default to v3 without panicking") +} + +func TestValkeyWarnPotentialVersionMismatch_DeduplicatesPerRequest(t *testing.T) { + store := &ValkeyOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + const view = "my_feature_view" + + store.warnPotentialVersionMismatch([]string{view}, 5) + _, warned := store.versionMismatchWarned.Load(view) + assert.True(t, warned, "view should be recorded after first call") + + store.warnPotentialVersionMismatch([]string{view}, 5) + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 1, count, "only one entry should exist for the view after repeated calls") + + const otherView = "other_feature_view" + store.warnPotentialVersionMismatch([]string{otherView}, 3) + count = 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 2, count, "each distinct request should have its own warning entry") +} + +func TestValkeyWarnPotentialVersionMismatch_MultiViewDedupIsOrderIndependent(t *testing.T) { + store := &ValkeyOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + store.warnPotentialVersionMismatch([]string{"view_a", "view_b"}, 4) + store.warnPotentialVersionMismatch([]string{"view_b", "view_a"}, 4) + + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 1, count, "same set of views in any order should dedupe to one entry") +} + +func TestValkeyWarnPotentialVersionMismatch_EmptyViewsIsNoop(t *testing.T) { + store := &ValkeyOnlineStore{ + config: ®istry.RepoConfig{EntityKeySerializationVersion: 3}, + } + + store.warnPotentialVersionMismatch(nil, 5) + store.warnPotentialVersionMismatch([]string{}, 5) + + count := 0 + store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true }) + assert.Equal(t, 0, count, "no warning entry should be recorded for an empty view set") +} diff --git a/go/internal/feast/onlinestore/entitykeyserialization.go b/go/internal/feast/onlinestore/entitykeyserialization.go new file mode 100644 index 00000000000..cba5573e25e --- /dev/null +++ b/go/internal/feast/onlinestore/entitykeyserialization.go @@ -0,0 +1,53 @@ +package onlinestore + +import ( + "sort" + "strings" + "sync" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/rs/zerolog/log" +) + +// resolveEntityKeySerializationVersion returns the entity key serialization version that an +// online store should use for reads. It mirrors utils.SerializeEntityKey (and Python's +// default): a 0/unset version resolves to v3. It is nil-safe so it can be called on stores +// constructed without a config (e.g. in tests). +func resolveEntityKeySerializationVersion(config *registry.RepoConfig) int64 { + if config != nil && config.EntityKeySerializationVersion != 0 { + return config.EntityKeySerializationVersion + } + return 3 +} + +// warnPotentialEntityKeyVersionMismatch emits a single, de-duplicated warning when an online +// read returned no data for every requested feature view. A complete miss across the whole +// request is a *potential* (not guaranteed) symptom of an entity_key_serialization_version +// mismatch between the write (materialization) and read (feature server) paths. It can also +// legitimately mean the requested entities simply are not present (e.g. not yet materialized +// or expired), so the message is intentionally non-alarming. +// +// De-duplication is keyed on the sorted set of views (tracked in warned) so each distinct +// request shape warns at most once per process lifetime. storeName is used only to make the +// message specific (e.g. "Cassandra", "Valkey"). configuredVersion is the effective read +// version, surfaced in the message to help operators compare against materialization. +func warnPotentialEntityKeyVersionMismatch(warned *sync.Map, storeName string, configuredVersion int64, views []string, numKeys int) { + if len(views) == 0 { + return + } + sortedViews := append([]string(nil), views...) + sort.Strings(sortedViews) + dedupKey := strings.Join(sortedViews, ",") + if _, alreadyWarned := warned.LoadOrStore(dedupKey, struct{}{}); alreadyWarned { + return + } + log.Warn().Msgf( + "%s online read returned no data for all %d entity key(s) across every requested "+ + "feature view %v. This is not necessarily a problem — it can simply mean the requested "+ + "entities are not present (e.g. not yet materialized or expired). However, if data was "+ + "expected, it may indicate an entity_key_serialization_version mismatch between "+ + "materialization (write) and the feature server (read). If so, verify that "+ + "entity_key_serialization_version in feature_store.yaml (current configured read version: "+ + "%d) matches the version used to materialize these feature views.", + storeName, numKeys, sortedViews, configuredVersion) +} From ecf5c5ef85c99af32f511251fcc106ecbd81d850 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Fri, 26 Jun 2026 14:49:53 -0500 Subject: [PATCH 2/3] fix: use explicit time range in integration test materialization The test data parquet files have timestamps from April 2025. When using materialize-incremental with TTL=0 feature views, Feast defaults to a 1-year lookback window. Once the current date passed April 2026, the materialization found 0 rows to ingest, causing all Go integration tests to fail. Switch from materialize-incremental to materialize with an explicit start time of 2025-01-01 to ensure test data is always included. Co-Authored-By: Claude Sonnet 4.6 --- go/internal/test/go_integration_test_utils.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index c59a9469061..ee5935d9304 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -274,11 +274,13 @@ func SetupInitializedRepo(basePath string) error { } t := time.Now() - formattedTime := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", + formattedEndTime := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) - materializeCommand := exec.Command(feastExec, "materialize-incremental", formattedTime) + // Use an explicit start time far enough in the past to include test data. + // Test parquet files have timestamps from April 2025, so we start from 2025-01-01. + materializeCommand := exec.Command(feastExec, "materialize", "2025-01-01T00:00:00", formattedEndTime) materializeCommand.Env = os.Environ() materializeCommand.Dir = featureRepoPath out, err = materializeCommand.CombinedOutput() From 763ceec421e8d395debd8f1786b77015e52d39da Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Fri, 26 Jun 2026 17:16:44 -0500 Subject: [PATCH 3/3] fix: correct materialization time range for test data The test parquet file go/internal/test/feature_repo/driver_stats.parquet contains timestamps from 2021-2022, not 2025. The previous commit used 2025-01-01 as the start date which excluded all test data. Co-Authored-By: Claude Opus 4.5 --- go/internal/test/go_integration_test_utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index ee5935d9304..b23afdfec97 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -279,8 +279,8 @@ func SetupInitializedRepo(basePath string) error { t.Hour(), t.Minute(), t.Second()) // Use an explicit start time far enough in the past to include test data. - // Test parquet files have timestamps from April 2025, so we start from 2025-01-01. - materializeCommand := exec.Command(feastExec, "materialize", "2025-01-01T00:00:00", formattedEndTime) + // Test parquet files have timestamps from 2021-2022, so we start from 2021-01-01. + materializeCommand := exec.Command(feastExec, "materialize", "2021-01-01T00:00:00", formattedEndTime) materializeCommand.Env = os.Environ() materializeCommand.Dir = featureRepoPath out, err = materializeCommand.CombinedOutput()