From 6044c0b6057e547b057e62d6921ee8a8b57832d2 Mon Sep 17 00:00:00 2001 From: piket Date: Thu, 25 Jun 2026 13:15:41 -0700 Subject: [PATCH 1/2] fix: preserve millisecond precision for UNIX_TIMESTAMP sort keys (EAPC-22316) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SortedFeatureView rows whose UNIX_TIMESTAMP sort keys differed by less than one second were being assigned the same sort key score in Redis/Valkey (wrong ordering) and the same Cassandra clustering key (row overwrites). Root cause: _python_datetime_to_int_timestamp() truncated all UNIX_TIMESTAMP values to integer seconds before storing in unix_timestamp_val. The * 1000 multiplications in zset_score() attempted ms precision but had no effect. Fix: - Add _python_datetime_to_int_ms_timestamp() for millisecond conversion. - In _convert_arrow_fv_to_proto(), detect UNIX_TIMESTAMP sort key columns and use the ms function for those columns only; all other UNIX_TIMESTAMP columns remain at second precision (backward compatible). - Remove the * 1000 from zset_score() in Redis, Valkey, and Cassandra — unix_timestamp_val is now already ms for sort key columns. - Add a threshold discriminator (val > 1e11) in feast_value_type_to_python_type() so sort key ms values are correctly read back as datetimes without affecting the seconds interpretation for regular feature columns. Co-Authored-By: Claude Sonnet 4.6 --- .../cassandra_online_store.py | 12 ++---- .../feast/infra/online_stores/eg_valkey.py | 4 +- sdk/python/feast/infra/online_stores/redis.py | 4 +- sdk/python/feast/type_map.py | 38 +++++++++++++++++-- sdk/python/feast/utils.py | 29 +++++++++++--- .../unit/infra/online_store/test_redis.py | 2 +- .../unit/infra/online_store/test_valkey.py | 2 +- 7 files changed, 66 insertions(+), 25 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 06b0f3ac4d7..e3aee93675a 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -538,18 +538,14 @@ def on_failure(exc, concurrent_queue): if feature_name in sort_key_names: feast_value_type = valProto.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = ( - valProto.unix_timestamp_val * 1000 - ) # Convert to milliseconds + feature_value = valProto.unix_timestamp_val # already milliseconds elif feast_value_type is None: feature_value = None elif feast_value_type in feast_array_types: if feast_value_type == "unix_timestamp_list_val": - # Convert list of timestamps to milliseconds - feature_value = [ - ts * 1000 - for ts in valProto.unix_timestamp_list_val.val # type:ignore - ] + feature_value = list( + valProto.unix_timestamp_list_val.val # type:ignore + ) # already milliseconds else: feature_value = getattr( valProto, str(feast_value_type) diff --git a/sdk/python/feast/infra/online_stores/eg_valkey.py b/sdk/python/feast/infra/online_stores/eg_valkey.py index 93f147f51b8..01f5bdf51f4 100644 --- a/sdk/python/feast/infra/online_stores/eg_valkey.py +++ b/sdk/python/feast/infra/online_stores/eg_valkey.py @@ -774,9 +774,7 @@ def zset_score(sort_key_value: ValueProto): """ feast_value_type = sort_key_value.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = ( - sort_key_value.unix_timestamp_val * 1000 - ) # Convert to milliseconds + feature_value = sort_key_value.unix_timestamp_val # already milliseconds else: feature_value = getattr(sort_key_value, str(feast_value_type)) return feature_value diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 8d3d3185ca5..06e90c25586 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -500,9 +500,7 @@ def zset_score(sort_key_value: ValueProto): """ feast_value_type = sort_key_value.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = ( - sort_key_value.unix_timestamp_val * 1000 - ) # Convert to milliseconds + feature_value = sort_key_value.unix_timestamp_val # already milliseconds else: feature_value = getattr(sort_key_value, str(feast_value_type)) return feature_value diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index ebf6f0eae19..209bb200d54 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -78,11 +78,16 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: if hasattr(val, "val"): val = list(val.val) - # Convert UNIX_TIMESTAMP values to `datetime` + # Convert UNIX_TIMESTAMP values to `datetime`. + # unix_timestamp_val stores seconds for regular features and milliseconds for sort key + # columns. Values > 1e11 are unambiguously ms (current-era seconds ~1.7e9, ms ~1.7e12; + # threshold is safe until year ~5138 in seconds). if val_attr == "unix_timestamp_list_val": val = [ ( - datetime.fromtimestamp(v, tz=timezone.utc) + datetime.fromtimestamp( + v / 1000.0 if v > 1e11 else float(v), tz=timezone.utc + ) if v != NULL_TIMESTAMP_INT_VALUE else None ) @@ -90,7 +95,9 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: ] elif val_attr == "unix_timestamp_val": val = ( - datetime.fromtimestamp(val, tz=timezone.utc) + datetime.fromtimestamp( + val / 1000.0 if val > 1e11 else float(val), tz=timezone.utc + ) if val != NULL_TIMESTAMP_INT_VALUE else None ) @@ -358,6 +365,31 @@ def _python_datetime_to_int_timestamp( return int_timestamps +def _python_datetime_to_int_ms_timestamp( + values: Sequence[Any], +) -> Sequence[Union[int, np.int_]]: + """Convert datetime values to milliseconds since epoch (used for sort key columns).""" + # Fast path for Numpy array. + if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): + if values.ndim != 1: + raise ValueError("Only 1 dimensional arrays are supported.") + return cast(Sequence[np.int_], values.astype("datetime64[ms]").astype(np.int_)) + + int_timestamps = [] + for value in values: + if isinstance(value, datetime): + int_timestamps.append(int(round(value.timestamp() * 1000))) + elif isinstance(value, Timestamp): + int_timestamps.append(int(value.ToMilliseconds())) + elif isinstance(value, np.datetime64): + int_timestamps.append(value.astype("datetime64[ms]").astype(np.int_)) # type: ignore[attr-defined] + elif isinstance(value, type(np.nan)): + int_timestamps.append(NULL_TIMESTAMP_INT_VALUE) + else: + int_timestamps.append(int(value)) + return int_timestamps + + def _python_value_to_proto_value( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index af55266c682..2a8ee42625b 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -42,7 +42,10 @@ from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.type_map import python_values_to_proto_values +from feast.type_map import ( + _python_datetime_to_int_ms_timestamp, + python_values_to_proto_values, +) from feast.types import ComplexFeastType, PrimitiveFeastType, from_feast_to_pyarrow_type from feast.value_type import ValueType from feast.version import get_version @@ -281,13 +284,27 @@ def _convert_arrow_fv_to_proto( (field.name, field.dtype.to_value_type()) for field in feature_view.features ] + list(join_keys.items()) - proto_values_by_column = { - column: python_values_to_proto_values( - table.column(column).to_numpy(zero_copy_only=False), value_type - ) - for column, value_type in columns + # Sort key UNIX_TIMESTAMP columns use ms precision so that timestamps differing + # by < 1 second produce distinct sort key scores in the online store. + sort_key_ts_names = { + sk.name + for sk in getattr(feature_view, "sort_keys", []) + if sk.value_type == ValueType.UNIX_TIMESTAMP } + proto_values_by_column = {} + for column, value_type in columns: + raw = table.column(column).to_numpy(zero_copy_only=False) + if column in sort_key_ts_names: + ms_vals = _python_datetime_to_int_ms_timestamp(raw) + proto_values_by_column[column] = [ + ValueProto(unix_timestamp_val=int(ts)) for ts in ms_vals + ] + else: + proto_values_by_column[column] = python_values_to_proto_values( + raw, value_type + ) + entity_keys = [ EntityKeyProto( join_keys=join_keys, diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index e2aad12bd7e..4bacd876f44 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -508,7 +508,7 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=base_ts + i * 60), + "event_timestamp": ValueProto(unix_timestamp_val=(base_ts + i * 60) * 1000), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None, diff --git a/sdk/python/tests/unit/infra/online_store/test_valkey.py b/sdk/python/tests/unit/infra/online_store/test_valkey.py index ed911bcb92a..5a78cd28dc4 100644 --- a/sdk/python/tests/unit/infra/online_store/test_valkey.py +++ b/sdk/python/tests/unit/infra/online_store/test_valkey.py @@ -402,7 +402,7 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=base_ts + i * 60), + "event_timestamp": ValueProto(unix_timestamp_val=(base_ts + i * 60) * 1000), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None, From 40f98c2f6b8f7dc499b026c044594d63f7d13374 Mon Sep 17 00:00:00 2001 From: piket Date: Thu, 25 Jun 2026 13:33:35 -0700 Subject: [PATCH 2/2] style: apply ruff formatting Co-Authored-By: Claude Sonnet 4.6 --- .../cassandra_online_store/cassandra_online_store.py | 4 +++- sdk/python/tests/unit/infra/online_store/test_redis.py | 4 +++- sdk/python/tests/unit/infra/online_store/test_valkey.py | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index e3aee93675a..631adc11abc 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -538,7 +538,9 @@ def on_failure(exc, concurrent_queue): if feature_name in sort_key_names: feast_value_type = valProto.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = valProto.unix_timestamp_val # already milliseconds + feature_value = ( + valProto.unix_timestamp_val + ) # already milliseconds elif feast_value_type is None: feature_value = None elif feast_value_type in feast_array_types: diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index 4bacd876f44..372cf97923c 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -508,7 +508,9 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=(base_ts + i * 60) * 1000), + "event_timestamp": ValueProto( + unix_timestamp_val=(base_ts + i * 60) * 1000 + ), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None, diff --git a/sdk/python/tests/unit/infra/online_store/test_valkey.py b/sdk/python/tests/unit/infra/online_store/test_valkey.py index 5a78cd28dc4..2af1e8060a1 100644 --- a/sdk/python/tests/unit/infra/online_store/test_valkey.py +++ b/sdk/python/tests/unit/infra/online_store/test_valkey.py @@ -402,7 +402,9 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=(base_ts + i * 60) * 1000), + "event_timestamp": ValueProto( + unix_timestamp_val=(base_ts + i * 60) * 1000 + ), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None,