diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 06b0f3ac4d7..631adc11abc 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -539,17 +539,15 @@ def on_failure(exc, concurrent_queue): feast_value_type = valProto.WhichOneof("val") if feast_value_type == "unix_timestamp_val": feature_value = ( - valProto.unix_timestamp_val * 1000 - ) # Convert to milliseconds + valProto.unix_timestamp_val + ) # already milliseconds elif feast_value_type is None: feature_value = None elif feast_value_type in feast_array_types: if feast_value_type == "unix_timestamp_list_val": - # Convert list of timestamps to milliseconds - feature_value = [ - ts * 1000 - for ts in valProto.unix_timestamp_list_val.val # type:ignore - ] + feature_value = list( + valProto.unix_timestamp_list_val.val # type:ignore + ) # already milliseconds else: feature_value = getattr( valProto, str(feast_value_type) diff --git a/sdk/python/feast/infra/online_stores/eg_valkey.py b/sdk/python/feast/infra/online_stores/eg_valkey.py index 93f147f51b8..01f5bdf51f4 100644 --- a/sdk/python/feast/infra/online_stores/eg_valkey.py +++ b/sdk/python/feast/infra/online_stores/eg_valkey.py @@ -774,9 +774,7 @@ def zset_score(sort_key_value: ValueProto): """ feast_value_type = sort_key_value.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = ( - sort_key_value.unix_timestamp_val * 1000 - ) # Convert to milliseconds + feature_value = sort_key_value.unix_timestamp_val # already milliseconds else: feature_value = getattr(sort_key_value, str(feast_value_type)) return feature_value diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 8d3d3185ca5..06e90c25586 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -500,9 +500,7 @@ def zset_score(sort_key_value: ValueProto): """ feast_value_type = sort_key_value.WhichOneof("val") if feast_value_type == "unix_timestamp_val": - feature_value = ( - sort_key_value.unix_timestamp_val * 1000 - ) # Convert to milliseconds + feature_value = sort_key_value.unix_timestamp_val # already milliseconds else: feature_value = getattr(sort_key_value, str(feast_value_type)) return feature_value diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index ebf6f0eae19..209bb200d54 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -78,11 +78,16 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: if hasattr(val, "val"): val = list(val.val) - # Convert UNIX_TIMESTAMP values to `datetime` + # Convert UNIX_TIMESTAMP values to `datetime`. + # unix_timestamp_val stores seconds for regular features and milliseconds for sort key + # columns. Values > 1e11 are unambiguously ms (current-era seconds ~1.7e9, ms ~1.7e12; + # threshold is safe until year ~5138 in seconds). if val_attr == "unix_timestamp_list_val": val = [ ( - datetime.fromtimestamp(v, tz=timezone.utc) + datetime.fromtimestamp( + v / 1000.0 if v > 1e11 else float(v), tz=timezone.utc + ) if v != NULL_TIMESTAMP_INT_VALUE else None ) @@ -90,7 +95,9 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: ] elif val_attr == "unix_timestamp_val": val = ( - datetime.fromtimestamp(val, tz=timezone.utc) + datetime.fromtimestamp( + val / 1000.0 if val > 1e11 else float(val), tz=timezone.utc + ) if val != NULL_TIMESTAMP_INT_VALUE else None ) @@ -358,6 +365,31 @@ def _python_datetime_to_int_timestamp( return int_timestamps +def _python_datetime_to_int_ms_timestamp( + values: Sequence[Any], +) -> Sequence[Union[int, np.int_]]: + """Convert datetime values to milliseconds since epoch (used for sort key columns).""" + # Fast path for Numpy array. + if isinstance(values, np.ndarray) and isinstance(values.dtype, np.datetime64): + if values.ndim != 1: + raise ValueError("Only 1 dimensional arrays are supported.") + return cast(Sequence[np.int_], values.astype("datetime64[ms]").astype(np.int_)) + + int_timestamps = [] + for value in values: + if isinstance(value, datetime): + int_timestamps.append(int(round(value.timestamp() * 1000))) + elif isinstance(value, Timestamp): + int_timestamps.append(int(value.ToMilliseconds())) + elif isinstance(value, np.datetime64): + int_timestamps.append(value.astype("datetime64[ms]").astype(np.int_)) # type: ignore[attr-defined] + elif isinstance(value, type(np.nan)): + int_timestamps.append(NULL_TIMESTAMP_INT_VALUE) + else: + int_timestamps.append(int(value)) + return int_timestamps + + def _python_value_to_proto_value( feast_value_type: ValueType, values: List[Any] ) -> List[ProtoValue]: diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index af55266c682..2a8ee42625b 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -42,7 +42,10 @@ from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.type_map import python_values_to_proto_values +from feast.type_map import ( + _python_datetime_to_int_ms_timestamp, + python_values_to_proto_values, +) from feast.types import ComplexFeastType, PrimitiveFeastType, from_feast_to_pyarrow_type from feast.value_type import ValueType from feast.version import get_version @@ -281,13 +284,27 @@ def _convert_arrow_fv_to_proto( (field.name, field.dtype.to_value_type()) for field in feature_view.features ] + list(join_keys.items()) - proto_values_by_column = { - column: python_values_to_proto_values( - table.column(column).to_numpy(zero_copy_only=False), value_type - ) - for column, value_type in columns + # Sort key UNIX_TIMESTAMP columns use ms precision so that timestamps differing + # by < 1 second produce distinct sort key scores in the online store. + sort_key_ts_names = { + sk.name + for sk in getattr(feature_view, "sort_keys", []) + if sk.value_type == ValueType.UNIX_TIMESTAMP } + proto_values_by_column = {} + for column, value_type in columns: + raw = table.column(column).to_numpy(zero_copy_only=False) + if column in sort_key_ts_names: + ms_vals = _python_datetime_to_int_ms_timestamp(raw) + proto_values_by_column[column] = [ + ValueProto(unix_timestamp_val=int(ts)) for ts in ms_vals + ] + else: + proto_values_by_column[column] = python_values_to_proto_values( + raw, value_type + ) + entity_keys = [ EntityKeyProto( join_keys=join_keys, diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index e2aad12bd7e..372cf97923c 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -508,7 +508,9 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=base_ts + i * 60), + "event_timestamp": ValueProto( + unix_timestamp_val=(base_ts + i * 60) * 1000 + ), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None, diff --git a/sdk/python/tests/unit/infra/online_store/test_valkey.py b/sdk/python/tests/unit/infra/online_store/test_valkey.py index ed911bcb92a..2af1e8060a1 100644 --- a/sdk/python/tests/unit/infra/online_store/test_valkey.py +++ b/sdk/python/tests/unit/infra/online_store/test_valkey.py @@ -402,7 +402,9 @@ def _make_rows(n=10): { "trip_id": ValueProto(int32_val=i), "rating": ValueProto(float_val=i + 0.5), - "event_timestamp": ValueProto(unix_timestamp_val=base_ts + i * 60), + "event_timestamp": ValueProto( + unix_timestamp_val=(base_ts + i * 60) * 1000 + ), }, datetime.fromtimestamp(base_ts + i * 60, tz=timezone.utc), None,