Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions go/internal/feast/onlinestore/cassandraonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/feast-dev/feast/go/internal/feast/model"
Expand Down Expand Up @@ -45,6 +46,8 @@ type CassandraOnlineStore struct {

// Caches table names instead of generating the table name every time
tableNameCache sync.Map

versionMismatchWarned sync.Map
}

type CassandraConfig struct {
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.Enti
cassandraKeys := make([]any, len(entityKeys))
cassandraKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {
var key, err = utils.SerializeEntityKey(entityKeys[i], 2)
var key, err = utils.SerializeEntityKey(entityKeys[i], c.resolvedEntityKeySerializationVersion())
if err != nil {
return nil, nil, err
}
Expand All @@ -404,6 +407,15 @@ func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.Enti
return cassandraKeys, cassandraKeyToEntityIndex, nil
}

func (c *CassandraOnlineStore) resolvedEntityKeySerializationVersion() int64 {
return resolveEntityKeySerializationVersion(c.config)
}

func (c *CassandraOnlineStore) warnPotentialVersionMismatch(views []string, numKeys int) {
warnPotentialEntityKeyVersionMismatch(
&c.versionMismatchWarned, "Cassandra", c.resolvedEntityKeySerializationVersion(), views, numKeys)
}

func (c *CassandraOnlineStore) validateUniqueFeatureNames(featureViewNames []string) error {
uniqueNames := make(map[string]int32)
for _, fvName := range featureViewNames {
Expand Down Expand Up @@ -499,7 +511,9 @@ func (c *CassandraOnlineStore) executeBatchV2(
var deserializedValue *types.Value

batchFeatures := make(map[string]map[string]*FeatureData)
rowsScanned := 0
for scanner.Next() {
rowsScanned++
err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr)
if err != nil {
return nil, fmt.Errorf("could not read row in query for (entity key, feature name, value, event ts): %w", err)
Expand Down Expand Up @@ -530,6 +544,15 @@ func (c *CassandraOnlineStore) executeBatchV2(
return nil, fmt.Errorf("failed to scan features: %w", err)
}

// OnlineReadV2 issues a single batch that covers this feature view's full set of entity
// keys, so rowsScanned == 0 here means the entire request missed for the only view
// involved. That is exactly the "all requested views missed" condition, so warning
// directly from this single-batch path is safe (unlike the V1 OnlineRead path, which
// splits keys across batches and must aggregate results before deciding).
if rowsScanned == 0 {
c.warnPotentialVersionMismatch([]string{job.ViewName}, len(job.EntityKeys))
}

for _, serializedEntityKey := range job.EntityKeys {
for _, featName := range job.FeatureNames {
keyString := serializedEntityKey.(string)
Expand Down Expand Up @@ -591,6 +614,10 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ

batches := c.createBatches(serializedEntityKeys)

// viewsWithData records which feature views had at least one row returned by Cassandra.
// Used after all batches complete to detect per-view complete misses (possible version mismatch).
viewsWithData := &sync.Map{}

g.Go(func() error {
defer close(jobsChan)

Expand Down Expand Up @@ -635,7 +662,7 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ
for job := range jobsChan {
g.Go(func(j BatchJob) func() error {
return func() error {
return c.executeBatch(ctx, j, serializedEntityKeyToIndex, results, featureNamesToIdx)
return c.executeBatch(ctx, j, serializedEntityKeyToIndex, results, featureNamesToIdx, viewsWithData)
}
}(job))
}
Expand All @@ -644,6 +671,20 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ
return nil, err
}

// After all batches: only warn if EVERY requested feature view returned zero rows. A
// single view missing is normal (sparse/cold entities); a complete miss across the whole
// request is a stronger — though still not conclusive — signal of a possible
// serialization version mismatch. Warn once (deduped) for the full set of views.
missedViews := make([]string, 0, len(viewGroups))
for viewName := range viewGroups {
if _, hasData := viewsWithData.Load(viewName); !hasData {
missedViews = append(missedViews, viewName)
}
}
if len(viewGroups) > 0 && len(missedViews) == len(viewGroups) {
c.warnPotentialVersionMismatch(missedViews, len(entityKeys))
}

return results, nil
}

Expand All @@ -653,6 +694,7 @@ func (c *CassandraOnlineStore) executeBatch(
serializedEntityKeyToIndex map[string]int,
results [][]FeatureData,
featureNamesToIdx map[string]int,
viewsWithData *sync.Map,
) error {
iter := c.session.Query(job.CQLStatement, job.EntityKeys...).WithContext(ctx).Iter()
defer iter.Close()
Expand All @@ -665,7 +707,9 @@ func (c *CassandraOnlineStore) executeBatch(
var deserializedValue *types.Value

batchFeatures := make(map[string]map[string]*FeatureData)
rowsScanned := 0
for scanner.Next() {
rowsScanned++
err := scanner.Scan(&entityKey, &featureName, &eventTs, &valueStr)
if err != nil {
return fmt.Errorf("could not read row in query for (entity key, feature name, value, event ts): %w", err)
Expand Down Expand Up @@ -696,6 +740,12 @@ func (c *CassandraOnlineStore) executeBatch(
return fmt.Errorf("failed to scan features: %w", err)
}

// Mark this feature view as having returned data so OnlineRead can determine
// per-view whether a complete miss occurred across all batches.
if rowsScanned > 0 && viewsWithData != nil {
viewsWithData.Store(job.ViewName, struct{}{})
}

for _, serializedEntityKey := range job.EntityKeys {
for _, featName := range job.FeatureNames {
keyString := serializedEntityKey.(string)
Expand Down Expand Up @@ -914,6 +964,11 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs
var waitGroup sync.WaitGroup
errorsChannel := make(chan error, nBatches)

// sawAnyRow is set to true by the first goroutine that returns at least one row.
// If it remains false after all batches finish, every entity key got a complete miss —
// a possible indicator of a serialization version mismatch.
var sawAnyRow atomic.Bool

canonicalFeats := make([]string, len(groupedRefs.FeatureNames))
isSortKey := make([]bool, len(groupedRefs.FeatureNames))
for i, name := range groupedRefs.FeatureNames {
Expand Down Expand Up @@ -950,6 +1005,7 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs
continue
}

sawAnyRow.Store(true)
rowData := results[rowIdx]

for i, featName := range groupedRefs.FeatureNames {
Expand Down Expand Up @@ -1011,6 +1067,12 @@ func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, groupedRefs
return nil, errors.Join(allErrors...)
}

// OnlineReadRange handles a single feature view, so a complete miss across all batches is
// the "all requested views missed" condition. Warn once if this looks like a version mismatch.
if !sawAnyRow.Load() {
c.warnPotentialVersionMismatch([]string{prepCtx.featureViewName}, len(groupedRefs.EntityKeys))
}

return results, nil
}

Expand Down
127 changes: 127 additions & 0 deletions go/internal/feast/onlinestore/cassandraonlinestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
package onlinestore

import (
"encoding/hex"
"fmt"
"reflect"
"testing"

"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/feast/utils"
"github.com/feast-dev/feast/go/protos/feast/core"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -539,3 +543,126 @@ func TestResolveFeatureValue_AlreadyLowercaseFeature(t *testing.T) {
assert.Equal(t, serving.FieldStatus_PRESENT, status)
assert.Equal(t, "hello", val.(*types.Value).GetStringVal())
}

func sampleEntityKey() *types.EntityKey {
return &types.EntityKey{
JoinKeys: []string{"user_id"},
EntityValues: []*types.Value{
{Val: &types.Value_Int64Val{Int64Val: 42}},
},
}
}

func serializedHex(t *testing.T, key *types.EntityKey, version int64) string {
t.Helper()
b, err := utils.SerializeEntityKey(key, version)
require.NoError(t, err)
return hex.EncodeToString(*b)
}

func TestBuildCassandraEntityKeys_UsesConfiguredVersion(t *testing.T) {
key := sampleEntityKey()

for _, tc := range []struct {
name string
version int64
}{
{"version2", 2},
{"version3", 3},
} {
t.Run(tc.name, func(t *testing.T) {
store := &CassandraOnlineStore{
config: &registry.RepoConfig{
EntityKeySerializationVersion: tc.version,
},
}

cassandraKeys, keyToIdx, err := store.buildCassandraEntityKeys([]*types.EntityKey{key})
require.NoError(t, err)
require.Len(t, cassandraKeys, 1)

gotHex := cassandraKeys[0].(string)
wantHex := serializedHex(t, key, tc.version)

assert.Equal(t, wantHex, gotHex,
"hex key for version %d should match utils.SerializeEntityKey output", tc.version)
assert.Equal(t, 0, keyToIdx[gotHex])
})
}

storeV2 := &CassandraOnlineStore{config: &registry.RepoConfig{EntityKeySerializationVersion: 2}}
storeV3 := &CassandraOnlineStore{config: &registry.RepoConfig{EntityKeySerializationVersion: 3}}
keysV2, _, _ := storeV2.buildCassandraEntityKeys([]*types.EntityKey{key})
keysV3, _, _ := storeV3.buildCassandraEntityKeys([]*types.EntityKey{key})
assert.NotEqual(t, keysV2[0], keysV3[0],
"v2 and v3 serialized keys must differ for the same entity key")
}

func TestBuildCassandraEntityKeys_ZeroVersionDefaultsToV3(t *testing.T) {
key := sampleEntityKey()

store := &CassandraOnlineStore{
config: &registry.RepoConfig{EntityKeySerializationVersion: 0},
}
keys, _, err := store.buildCassandraEntityKeys([]*types.EntityKey{key})
require.NoError(t, err)

wantHex := serializedHex(t, key, 3)
assert.Equal(t, wantHex, keys[0].(string), "zero version should produce v3 keys")
}

func TestResolvedEntityKeySerializationVersion(t *testing.T) {
assert.Equal(t, int64(3), (&CassandraOnlineStore{config: &registry.RepoConfig{EntityKeySerializationVersion: 0}}).resolvedEntityKeySerializationVersion())
assert.Equal(t, int64(2), (&CassandraOnlineStore{config: &registry.RepoConfig{EntityKeySerializationVersion: 2}}).resolvedEntityKeySerializationVersion())
assert.Equal(t, int64(3), (&CassandraOnlineStore{config: &registry.RepoConfig{EntityKeySerializationVersion: 3}}).resolvedEntityKeySerializationVersion())
assert.Equal(t, int64(3), (&CassandraOnlineStore{config: nil}).resolvedEntityKeySerializationVersion(), "nil config should default to v3 without panicking")
}

func TestWarnPotentialVersionMismatch_DeduplicatesPerRequest(t *testing.T) {
store := &CassandraOnlineStore{
config: &registry.RepoConfig{EntityKeySerializationVersion: 3},
}

const view = "my_feature_view"

store.warnPotentialVersionMismatch([]string{view}, 5)
_, warned := store.versionMismatchWarned.Load(view)
assert.True(t, warned, "view should be recorded after first call")

store.warnPotentialVersionMismatch([]string{view}, 5)
count := 0
store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true })
assert.Equal(t, 1, count, "only one entry should exist for the view after repeated calls")

const otherView = "other_feature_view"
store.warnPotentialVersionMismatch([]string{otherView}, 3)
count = 0
store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true })
assert.Equal(t, 2, count, "each distinct request should have its own warning entry")
}

func TestWarnPotentialVersionMismatch_MultiViewDedupIsOrderIndependent(t *testing.T) {
store := &CassandraOnlineStore{
config: &registry.RepoConfig{EntityKeySerializationVersion: 3},
}

store.warnPotentialVersionMismatch([]string{"view_a", "view_b"}, 4)
store.warnPotentialVersionMismatch([]string{"view_b", "view_a"}, 4)

count := 0
store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true })
assert.Equal(t, 1, count, "same set of views in any order should dedupe to one entry")
}

func TestWarnPotentialVersionMismatch_EmptyViewsIsNoop(t *testing.T) {
store := &CassandraOnlineStore{
config: &registry.RepoConfig{EntityKeySerializationVersion: 3},
}

store.warnPotentialVersionMismatch(nil, 5)
store.warnPotentialVersionMismatch([]string{}, 5)

count := 0
store.versionMismatchWarned.Range(func(_, _ any) bool { count++; return true })
assert.Equal(t, 0, count, "no warning entry should be recorded for an empty view set")
}
Loading
Loading