From 8e90787397c8ddb9b9c51dbd1917d6a2de77532b Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 25 Jun 2026 00:31:11 +0000 Subject: [PATCH 1/3] feat(eap-items): read array attributes from typed map columns for recent queries Array attributes have been double-written into the typed `attributes_array_{string,int,float,bool}` map columns since 2026-06-22 (#8079). This switches the array-attribute filter read path to those typed columns for query windows new enough that the columns are fully populated, falling back to the legacy `attributes_array` JSON column otherwise. - Add `USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS` (2026-06-23 00:00 UTC) and `use_array_map_columns(meta)`, mirroring `use_sampling_factor`. The cutoff is the day after the double-write began, so only data guaranteed to be present in the typed columns is read from them. A `use_array_map_columns_timestamp_seconds` state config of 0 disables the typed-column read path. - Add `type_array_to_membership_array_expression_from_typed_columns`, the typed-column counterpart of `type_array_to_membership_array_expression`. It builds a normalized `Array(String)` via `arrayConcat` over the string, float and bool columns so per-element comparisons keep matching. Ints are double-written into the float column (the read path resolves numerics to float), so the int column is not read to avoid duplicate elements. - Thread `use_array_map_columns` through `trace_item_filters_to_expression` and `_trace_item_filter_key_expression`, and pass `use_array_map_columns(meta)` at every resolver/endpoint filter call site. `get_field_existence_expression` treats the `arrayConcat` membership expression like `arrayMap` (notEmpty). The SELECT and aggregate array paths keep reading the always-written JSON column, so this change is read-consistent for any window. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01J9drsbsd3QyVPEFy8ALB7A --- snuba/protos/common.py | 56 +++++++ snuba/settings/__init__.py | 7 + snuba/web/rpc/common/common.py | 85 ++++++++-- snuba/web/rpc/v1/endpoint_get_traces.py | 15 +- .../web/rpc/v1/endpoint_trace_item_details.py | 7 +- .../resolvers/R_eap_items/heatmap_builder.py | 3 + .../R_eap_items/resolver_time_series.py | 5 +- .../R_eap_items/resolver_trace_item_stats.py | 2 + .../R_eap_items/resolver_trace_item_table.py | 2 + .../v1/resolvers/common/cross_item_queries.py | 3 + tests/web/rpc/test_common.py | 145 +++++++++++++++++- 11 files changed, 313 insertions(+), 17 deletions(-) diff --git a/snuba/protos/common.py b/snuba/protos/common.py index 3ae0b3e0fb6..27cd8659f09 100644 --- a/snuba/protos/common.py +++ b/snuba/protos/common.py @@ -196,6 +196,62 @@ def type_array_to_stored_array_json_path(attr_key: AttributeKey) -> JsonPath: ) +def type_array_to_membership_array_expression_from_typed_columns( + attr_key: AttributeKey, +) -> FunctionCall: + """WHERE-clause membership array built from the typed array map columns. + + Counterpart to ``type_array_to_membership_array_expression``, which reads the + legacy ``attributes_array`` JSON column. Since 2026-06-22 array attributes are + also double-written into typed ``Map(String, Array(T))`` columns; for query + windows new enough that those columns are fully populated (see + ``use_array_map_columns``) we read them instead. + + Returns a normalized ``Array(String)`` of every element across the string, + float, and bool columns so the per-element comparisons built by + ``_type_array_membership_rhs_expression`` keep matching the JSON-column + behaviour (string elements stay as-is, numbers become ``toString(...)``, and + bools become ``'true'``/``'false'``). Integer elements are double-written into + ``attributes_array_float`` (the read path resolves all numeric types to the + float column), so the int column is intentionally not read — including it + would duplicate every integer element. + """ + if attr_key.type != AttributeKey.Type.TYPE_ARRAY: + raise MalformedAttributeException( + f"type_array_to_membership_array_expression_from_typed_columns expected " + f"TYPE_ARRAY, got {AttributeKey.Type.Name(attr_key.type)}" + ) + alias = f"{_build_label_mapping_key(attr_key)}__array_members" + name = attr_key.name + x = Argument(None, "x") + string_elements = arrayElement(None, column("attributes_array_string"), literal(name)) + float_elements = FunctionCall( + None, + "arrayMap", + ( + Lambda(None, ("x",), FunctionCall(None, "toString", (x,))), + arrayElement(None, column("attributes_array_float"), literal(name)), + ), + ) + bool_elements = FunctionCall( + None, + "arrayMap", + ( + Lambda( + None, + ("x",), + FunctionCall(None, "if", (x, literal("true"), literal("false"))), + ), + arrayElement(None, column("attributes_array_bool"), literal(name)), + ), + ) + return FunctionCall( + alias=alias, + function_name="arrayConcat", + parameters=(string_elements, float_elements, bool_elements), + ) + + def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: """Convert an AttributeKey proto to a Snuba Expression. diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index a1aebf43709..ba03ee74acf 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -249,6 +249,13 @@ class RedisClusters(TypedDict): # Represents 10AM PST April 8, 2025 which is the date we started writing the sampling factor. We can remove this setting once 90 days have passed since this date. USE_SAMPLING_FACTOR_TIMESTAMP_SECONDS = 1744131600 +# Represents 2026-06-23 00:00 UTC, the day after we began double-writing array +# attributes into the typed `attributes_array_*` map columns (2026-06-22). Only +# queries whose window starts on/after this read those typed columns; older data +# only exists in the legacy `attributes_array` JSON column. We can remove this +# setting once enough time has passed that all queryable data is double-written. +USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS = 1782172800 + # Processor/Writer Options diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index d196a66e802..d1799a3d83a 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -21,6 +21,7 @@ PROTO_TYPE_TO_CLICKHOUSE_TYPE, MalformedAttributeException, type_array_to_membership_array_expression, + type_array_to_membership_array_expression_from_typed_columns, ) from snuba.protos.common import ( attribute_key_to_expression as _attribute_key_to_expression, @@ -86,14 +87,22 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: def _trace_item_filter_key_expression( - attr_to_key_expression_callable: Callable[[AttributeKey], Expression], key: AttributeKey + attr_to_key_expression_callable: Callable[[AttributeKey], Expression], + key: AttributeKey, + use_array_map_columns: bool = False, ) -> Expression: """predicates must use the normalized ``arrayMap`` (``type_array_to_membership_array_expression``) so ``arrayExists`` compares per element. It is different from SELECT predicate. + + When ``use_array_map_columns`` is set, array predicates read the typed + ``attributes_array_*`` map columns instead of the legacy ``attributes_array`` + JSON column (see ``use_array_map_columns``). """ if key.type == AttributeKey.Type.TYPE_ARRAY: try: + if use_array_map_columns: + return type_array_to_membership_array_expression_from_typed_columns(key) return type_array_to_membership_array_expression(key) except MalformedAttributeException as e: raise BadSnubaRPCRequestException(str(e)) from e @@ -248,6 +257,27 @@ def use_sampling_factor(meta: RequestMeta) -> bool: return meta.start_timestamp.seconds >= use_sampling_factor_timestamp_seconds +def use_array_map_columns(meta: RequestMeta) -> bool: + """ + Array attributes were double-written into the typed ``attributes_array_*`` map + columns starting 2026-06-22, so we should only read those columns on queries + whose window starts on/after the cutoff (2026-06-23 UTC by default). Older data + only exists in the legacy ``attributes_array`` JSON column. A config value of 0 + disables the typed-column read path entirely. + """ + use_array_map_columns_timestamp_seconds = cast( + int, + state.get_int_config( + "use_array_map_columns_timestamp_seconds", + settings.USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS, + ), + ) + if use_array_map_columns_timestamp_seconds == 0: + return False + + return meta.start_timestamp.seconds >= use_array_map_columns_timestamp_seconds + + def treeify_or_and_conditions(query: Query) -> None: """ look for expressions like or(a, b, c) and turn them into or(a, or(b, c)) @@ -735,6 +765,7 @@ def trace_item_filters_to_expression( item_filter: TraceItemFilter, attribute_key_to_expression: Callable[[AttributeKey], Expression], membership_as_has: bool = False, + use_array_map_columns: bool = False, ) -> Expression: """ Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024") @@ -746,6 +777,10 @@ def trace_item_filters_to_expression( ``IN`` set leaks an unstable ``__set_*`` identifier into the result-block column name and breaks mixed-version distributed reads (see ``_in_or_has``). Leave the default for WHERE clauses, where the prepared ``IN`` set drives pruning. + :param use_array_map_columns: resolve ``TYPE_ARRAY`` predicates against the typed + ``attributes_array_*`` map columns instead of the legacy ``attributes_array`` + JSON column. Pass ``use_array_map_columns(meta)`` so this is only enabled for + query windows new enough that the typed columns are fully populated. :return: """ if item_filter.HasField("and_filter"): @@ -754,11 +789,19 @@ def trace_item_filters_to_expression( return literal(True) elif len(filters) == 1: return trace_item_filters_to_expression( - filters[0], attribute_key_to_expression, membership_as_has + filters[0], + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, ) return and_cond( *( - trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has) + trace_item_filters_to_expression( + x, + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, + ) for x in filters ) ) @@ -769,11 +812,19 @@ def trace_item_filters_to_expression( raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause") elif len(filters) == 1: return trace_item_filters_to_expression( - filters[0], attribute_key_to_expression, membership_as_has + filters[0], + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, ) return or_cond( *( - trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has) + trace_item_filters_to_expression( + x, + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, + ) for x in filters ) ) @@ -785,14 +836,20 @@ def trace_item_filters_to_expression( elif len(filters) == 1: return not_cond( trace_item_filters_to_expression( - filters[0], attribute_key_to_expression, membership_as_has + filters[0], + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, ) ) return not_cond( and_cond( *( trace_item_filters_to_expression( - x, attribute_key_to_expression, membership_as_has + x, + attribute_key_to_expression, + membership_as_has, + use_array_map_columns, ) for x in filters ) @@ -808,7 +865,9 @@ def trace_item_filters_to_expression( _validate_comparison_filter_type_array(op, v) k_expression = _trace_item_filter_key_expression( - attr_to_key_expression_callable=attribute_key_to_expression, key=k + attr_to_key_expression_callable=attribute_key_to_expression, + key=k, + use_array_map_columns=use_array_map_columns, ) value_type = v.WhichOneof("value") @@ -1074,6 +1133,7 @@ def trace_item_filters_to_expression( _trace_item_filter_key_expression( attr_to_key_expression_callable=attribute_key_to_expression, key=item_filter.exists_filter.key, + use_array_map_columns=use_array_map_columns, ) ) @@ -1180,9 +1240,12 @@ def get_subscriptable_field(field: Expression) -> SubscriptableReference | None: if isinstance(field, FunctionCall) and field.function_name == "arrayElement": return map_key_exists(field.parameters[0], field.parameters[1]) - if isinstance(field, FunctionCall) and field.function_name == "arrayMap": - # Array attributes in the JSON column return empty arrays (not NULL) - # for missing keys, so notEmpty is the correct existence check. + if isinstance(field, FunctionCall) and field.function_name in ("arrayMap", "arrayConcat"): + # Array attributes return empty arrays (not NULL) for missing keys, so + # notEmpty is the correct existence check. This covers both the JSON-column + # membership expression (arrayMap) and the typed-column one (arrayConcat of + # the per-type map lookups, see + # type_array_to_membership_array_expression_from_typed_columns). return f.notEmpty(field) return f.isNotNull(field) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 7096690d012..7be2e8a4774 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -46,6 +46,7 @@ base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, ) from snuba.web.rpc.common.debug_info import ( extract_response_meta, @@ -556,7 +557,9 @@ def _is_cross_event_query( return len(set([f.item_type for f in filters])) > 1 def _get_trace_item_filter_expressions( - self, filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter] + self, + filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter], + request_meta: RequestMeta, ) -> dict[TraceItemType.ValueType, Expression]: """ Returns a dict mapping item types to a filter expression for that item type. @@ -584,6 +587,7 @@ def _get_trace_item_filter_expressions( ), attribute_key_to_expression, membership_as_has=True, + use_array_map_columns=use_array_map_columns(request_meta), ), ) @@ -607,6 +611,7 @@ def _get_trace_ids_for_single_item_query( ), ), attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request.meta), ) selected_columns: list[SelectedExpression] = [ SelectedExpression( @@ -671,7 +676,9 @@ def _get_metadata_for_traces( ) -> tuple[list[GetTracesResponse.Trace], Any]: # We use the item type specified in the request meta for the trace item filter conditions. # If no item type is specified, we use all the filters. - filter_expressions_by_item_type = self._get_trace_item_filter_expressions(request.filters) + filter_expressions_by_item_type = self._get_trace_item_filter_expressions( + request.filters, request.meta + ) trace_item_filters_expression = None item_type = None if request.meta.trace_item_type in filter_expressions_by_item_type: @@ -787,7 +794,9 @@ def _get_metadata_for_traces_with_subquery( """ # We use the item type specified in the request meta for the trace item filter conditions. # If no item type is specified, we use all the filters. - filter_expressions_by_item_type = self._get_trace_item_filter_expressions(request.filters) + filter_expressions_by_item_type = self._get_trace_item_filter_expressions( + request.filters, request.meta + ) trace_item_filters_expression = None item_type = None if request.meta.trace_item_type in filter_expressions_by_item_type: diff --git a/snuba/web/rpc/v1/endpoint_trace_item_details.py b/snuba/web/rpc/v1/endpoint_trace_item_details.py index 232326661c2..e78fd07d7db 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_details.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_details.py @@ -34,6 +34,7 @@ decode_attributes_array_value, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, ) from snuba.web.rpc.common.debug_info import ( extract_response_meta, @@ -99,7 +100,11 @@ def _build_query(request: TraceItemDetailsRequest) -> Query: column("trace_id"), literal(request.trace_id), ), - trace_item_filters_to_expression(request.filter, attribute_key_to_expression), + trace_item_filters_to_expression( + request.filter, + attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request.meta), + ), ), limit=1, ) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/heatmap_builder.py b/snuba/web/rpc/v1/resolvers/R_eap_items/heatmap_builder.py index e58c116e311..f68589f258e 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/heatmap_builder.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/heatmap_builder.py @@ -35,6 +35,7 @@ base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, ) from snuba.web.rpc.common.debug_info import setup_trace_query_settings from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException @@ -124,6 +125,7 @@ def _get_min_max_bucketsize_y( filter_expression = trace_item_filters_to_expression( filter, (attribute_key_to_expression), + use_array_map_columns=use_array_map_columns(in_msg.meta), ) condition = base_conditions_and(in_msg.meta, filter_expression) min_max_query = Query( @@ -214,6 +216,7 @@ def _build_heatmap_query( filter_expression = trace_item_filters_to_expression( filter, (attribute_key_to_expression), + use_array_map_columns=use_array_map_columns(self.in_msg.meta), ) condition = base_conditions_and(self.in_msg.meta, filter_expression) bucket_index_y = f.least( diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py index 5e03379def2..c562005305d 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py @@ -45,6 +45,7 @@ base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, use_sampling_factor, valid_sampling_factor_conditions, ) @@ -419,7 +420,9 @@ def build_query( condition=base_conditions_and( request.meta, trace_item_filters_to_expression( - request.filter, _get_attribute_key_to_expression_function(request.meta) + request.filter, + _get_attribute_key_to_expression_function(request.meta), + use_array_map_columns=use_array_map_columns(request.meta), ), valid_sampling_factor_conditions(), *item_type_conds, diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_stats.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_stats.py index dac6e4d9f0f..3d45ec19211 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_stats.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_stats.py @@ -45,6 +45,7 @@ base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, ) from snuba.web.rpc.common.debug_info import ( extract_response_meta, @@ -233,6 +234,7 @@ def _build_attr_distribution_query( trace_item_filters_expression = trace_item_filters_to_expression( in_msg.filter, (attribute_key_to_expression), + use_array_map_columns=use_array_map_columns(in_msg.meta), ) item_type_filter = f.equals(column("item_type"), in_msg.meta.trace_item_type) query = Query( diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py index caa9cd2da9f..f97fb627954 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py @@ -62,6 +62,7 @@ timestamp_in_range_condition, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, use_sampling_factor, valid_sampling_factor_conditions, ) @@ -633,6 +634,7 @@ def build_query( trace_item_filters_to_expression( request.filter, attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request.meta), ), valid_sampling_factor_conditions(), *item_type_conds, diff --git a/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py b/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py index b51ddf6adef..ca5ac8fafe5 100644 --- a/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py +++ b/snuba/web/rpc/v1/resolvers/common/cross_item_queries.py @@ -30,6 +30,7 @@ base_conditions_and, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, ) # 50 million trace ids * 16 bytes per id = a limit of 1gigabyte memory usage per cross item query @@ -90,6 +91,7 @@ def get_trace_ids_sql_for_cross_item_query( trace_item_filters_to_expression( trace_filter.filter, attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request_meta), ), ) ) @@ -100,6 +102,7 @@ def get_trace_ids_sql_for_cross_item_query( trace_filter.filter, attribute_key_to_expression, membership_as_has=True, + use_array_map_columns=use_array_map_columns(request_meta), ), ) ) diff --git a/tests/web/rpc/test_common.py b/tests/web/rpc/test_common.py index d6ce06c540b..d8f870484de 100644 --- a/tests/web/rpc/test_common.py +++ b/tests/web/rpc/test_common.py @@ -31,9 +31,16 @@ from snuba import settings from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.protos.common import ATTRIBUTES_TO_COALESCE +from snuba.protos.common import ( + ATTRIBUTES_TO_COALESCE, + MalformedAttributeException, + type_array_to_membership_array_expression_from_typed_columns, +) from snuba.query.dsl import Functions as f from snuba.query.dsl import and_cond, column +from snuba.query.expressions import ( + Column as ColumnExpr, +) from snuba.query.expressions import ( Expression, FunctionCall, @@ -50,6 +57,7 @@ prev_monday, trace_item_filters_to_expression, treeify_or_and_conditions, + use_array_map_columns, use_sampling_factor, ) from snuba.web.rpc.common.exceptions import ( @@ -92,6 +100,27 @@ def test_use_sampling_factor(self, snuba_set_config: SnubaSetConfig) -> None: assert use_sampling_factor(RequestMeta(start_timestamp=Timestamp(seconds=10))) assert not use_sampling_factor(RequestMeta(start_timestamp=Timestamp(seconds=9))) + @pytest.mark.redis_db + def test_use_array_map_columns(self, snuba_set_config: SnubaSetConfig) -> None: + assert use_array_map_columns( + RequestMeta( + start_timestamp=Timestamp(seconds=settings.USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS) + ) + ) + assert not use_array_map_columns( + RequestMeta( + start_timestamp=Timestamp( + seconds=settings.USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS - 1 + ) + ) + ) + # A config value of 0 disables the typed-column read path entirely. + snuba_set_config("use_array_map_columns_timestamp_seconds", 0) + assert not use_array_map_columns(RequestMeta(start_timestamp=Timestamp(seconds=2**31))) + snuba_set_config("use_array_map_columns_timestamp_seconds", 10) + assert use_array_map_columns(RequestMeta(start_timestamp=Timestamp(seconds=10))) + assert not use_array_map_columns(RequestMeta(start_timestamp=Timestamp(seconds=9))) + class TestTraceItemFiltersArrayLike: def _make_like_filter( @@ -225,6 +254,120 @@ def test_not_like_on_int_key_raises(self) -> None: trace_item_filters_to_expression(item_filter, attribute_key_to_expression) +def _collect_column_names(expr: Expression) -> set[str]: + names: set[str] = set() + + def visit(node: Expression) -> Expression: + if isinstance(node, ColumnExpr): + names.add(node.column_name) + return node + + expr.transform(visit) + return names + + +_TYPED_ARRAY_COLUMNS = { + "attributes_array_string", + "attributes_array_float", + "attributes_array_bool", +} + + +class TestTraceItemFiltersArrayMapColumns: + """Array predicates read the typed ``attributes_array_*`` map columns when + ``use_array_map_columns`` is set, and the legacy ``attributes_array`` JSON + column otherwise. The int column is intentionally not read (ints are + double-written into ``attributes_array_float``).""" + + def _array_filter( + self, + op: ComparisonFilter.Op.ValueType, + value: AttributeValue, + ) -> TraceItemFilter: + return TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.Type.TYPE_ARRAY, name="my_tags"), + op=op, + value=value, + ) + ) + + def test_like_on_array_key_uses_json_column_by_default(self) -> None: + result = trace_item_filters_to_expression( + self._array_filter(ComparisonFilter.OP_LIKE, AttributeValue(val_str="%error%")), + attribute_key_to_expression, + ) + assert isinstance(result, FunctionCall) + assert result.function_name == "arrayExists" + membership = result.parameters[1] + assert isinstance(membership, FunctionCall) + assert membership.function_name == "arrayMap" + assert _collect_column_names(membership) == {"attributes_array"} + + def test_like_on_array_key_uses_typed_columns_when_enabled(self) -> None: + result = trace_item_filters_to_expression( + self._array_filter(ComparisonFilter.OP_LIKE, AttributeValue(val_str="%error%")), + attribute_key_to_expression, + use_array_map_columns=True, + ) + assert isinstance(result, FunctionCall) + assert result.function_name == "arrayExists" + membership = result.parameters[1] + assert isinstance(membership, FunctionCall) + assert membership.function_name == "arrayConcat" + assert _collect_column_names(membership) == _TYPED_ARRAY_COLUMNS + + def test_equals_on_array_key_uses_typed_columns_when_enabled(self) -> None: + result = trace_item_filters_to_expression( + self._array_filter(ComparisonFilter.OP_EQUALS, AttributeValue(val_str="error")), + attribute_key_to_expression, + use_array_map_columns=True, + ) + assert isinstance(result, FunctionCall) + assert result.function_name == "arrayExists" + membership = result.parameters[1] + assert isinstance(membership, FunctionCall) + assert membership.function_name == "arrayConcat" + assert _collect_column_names(membership) == _TYPED_ARRAY_COLUMNS + + def test_exists_filter_on_array_key_uses_typed_columns_when_enabled(self) -> None: + item_filter = TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.Type.TYPE_ARRAY, name="my_tags") + ) + ) + result = trace_item_filters_to_expression( + item_filter, attribute_key_to_expression, use_array_map_columns=True + ) + # Existence is notEmpty(arrayConcat(...)) over the typed columns. + assert isinstance(result, FunctionCall) + assert result.function_name == "notEmpty" + inner = result.parameters[0] + assert isinstance(inner, FunctionCall) + assert inner.function_name == "arrayConcat" + assert _collect_column_names(inner) == _TYPED_ARRAY_COLUMNS + + def test_exists_filter_on_array_key_uses_json_column_by_default(self) -> None: + item_filter = TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(type=AttributeKey.Type.TYPE_ARRAY, name="my_tags") + ) + ) + result = trace_item_filters_to_expression(item_filter, attribute_key_to_expression) + assert isinstance(result, FunctionCall) + assert result.function_name == "notEmpty" + inner = result.parameters[0] + assert isinstance(inner, FunctionCall) + assert inner.function_name == "arrayMap" + assert _collect_column_names(inner) == {"attributes_array"} + + def test_typed_membership_function_rejects_non_array(self) -> None: + with pytest.raises(MalformedAttributeException): + type_array_to_membership_array_expression_from_typed_columns( + AttributeKey(type=AttributeKey.Type.TYPE_STRING, name="my_tags") + ) + + class TestExistsFilterCoalesced: """exists_filter on coalesced attributes must check all deprecated keys.""" From a637d6d231fdb2286afd33a1b9e0640ccece2451 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 25 Jun 2026 00:37:00 +0000 Subject: [PATCH 2/3] fix(eap-items): include attributes_array_int in the typed membership array MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Array integers are written only to `attributes_array_int`, not also to the float column — `AttributeMap::insert_array` (eap_items Rust processor) routes each element to exactly one typed column by type, unlike the scalar `insert_int` double-write. Reading only string/float/bool therefore missed int-only arrays entirely: membership filters never matched and `exists_filter` reported the key as absent. Read all four typed columns in the `arrayConcat`, mapping ints to strings via `toString`. Element types never overlap across the typed columns, so no element is duplicated. Caught by Cursor Bugbot review. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01J9drsbsd3QyVPEFy8ALB7A --- snuba/protos/common.py | 41 +++++++++++++++++++++--------------- tests/web/rpc/test_common.py | 5 +++-- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/snuba/protos/common.py b/snuba/protos/common.py index 27cd8659f09..99e69e5d79d 100644 --- a/snuba/protos/common.py +++ b/snuba/protos/common.py @@ -207,14 +207,15 @@ def type_array_to_membership_array_expression_from_typed_columns( windows new enough that those columns are fully populated (see ``use_array_map_columns``) we read them instead. - Returns a normalized ``Array(String)`` of every element across the string, - float, and bool columns so the per-element comparisons built by + Returns a normalized ``Array(String)`` of every element across all four typed + columns so the per-element comparisons built by ``_type_array_membership_rhs_expression`` keep matching the JSON-column behaviour (string elements stay as-is, numbers become ``toString(...)``, and - bools become ``'true'``/``'false'``). Integer elements are double-written into - ``attributes_array_float`` (the read path resolves all numeric types to the - float column), so the int column is intentionally not read — including it - would duplicate every integer element. + bools become ``'true'``/``'false'``). Unlike the scalar double-write, array + integers are written only to ``attributes_array_int`` (not also to the float + column — see ``AttributeMap::insert_array`` in the ``eap_items`` Rust + processor), so the int column must be read for int arrays to match; element + types never overlap across columns, so no element is duplicated. """ if attr_key.type != AttributeKey.Type.TYPE_ARRAY: raise MalformedAttributeException( @@ -223,16 +224,22 @@ def type_array_to_membership_array_expression_from_typed_columns( ) alias = f"{_build_label_mapping_key(attr_key)}__array_members" name = attr_key.name - x = Argument(None, "x") + + def _to_string_elements(col_name: str) -> FunctionCall: + x = Argument(None, "x") + return FunctionCall( + None, + "arrayMap", + ( + Lambda(None, ("x",), FunctionCall(None, "toString", (x,))), + arrayElement(None, column(col_name), literal(name)), + ), + ) + string_elements = arrayElement(None, column("attributes_array_string"), literal(name)) - float_elements = FunctionCall( - None, - "arrayMap", - ( - Lambda(None, ("x",), FunctionCall(None, "toString", (x,))), - arrayElement(None, column("attributes_array_float"), literal(name)), - ), - ) + int_elements = _to_string_elements("attributes_array_int") + float_elements = _to_string_elements("attributes_array_float") + bool_x = Argument(None, "x") bool_elements = FunctionCall( None, "arrayMap", @@ -240,7 +247,7 @@ def type_array_to_membership_array_expression_from_typed_columns( Lambda( None, ("x",), - FunctionCall(None, "if", (x, literal("true"), literal("false"))), + FunctionCall(None, "if", (bool_x, literal("true"), literal("false"))), ), arrayElement(None, column("attributes_array_bool"), literal(name)), ), @@ -248,7 +255,7 @@ def type_array_to_membership_array_expression_from_typed_columns( return FunctionCall( alias=alias, function_name="arrayConcat", - parameters=(string_elements, float_elements, bool_elements), + parameters=(string_elements, int_elements, float_elements, bool_elements), ) diff --git a/tests/web/rpc/test_common.py b/tests/web/rpc/test_common.py index d8f870484de..7e32b8c515f 100644 --- a/tests/web/rpc/test_common.py +++ b/tests/web/rpc/test_common.py @@ -268,6 +268,7 @@ def visit(node: Expression) -> Expression: _TYPED_ARRAY_COLUMNS = { "attributes_array_string", + "attributes_array_int", "attributes_array_float", "attributes_array_bool", } @@ -276,8 +277,8 @@ def visit(node: Expression) -> Expression: class TestTraceItemFiltersArrayMapColumns: """Array predicates read the typed ``attributes_array_*`` map columns when ``use_array_map_columns`` is set, and the legacy ``attributes_array`` JSON - column otherwise. The int column is intentionally not read (ints are - double-written into ``attributes_array_float``).""" + column otherwise. All four typed columns are read: unlike the scalar + double-write, array ints live only in ``attributes_array_int``.""" def _array_filter( self, From 3f204f38700e3f02aab017b949f7baf4c1b27529 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 25 Jun 2026 01:00:23 +0000 Subject: [PATCH 3/3] feat(eap-items): thread use_array_map_columns through conditional aggregations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Conditional-aggregation filters (the predicate inside countIf/sumIf, built by `_get_condition_in_aggregation`) on array attributes were still resolving against the legacy `attributes_array` JSON column, because the aggregation helpers didn't propagate `use_array_map_columns`. Results were correct (the JSON column is always written) but didn't use the typed columns. Thread `use_array_map_columns` through the aggregation chain alongside the existing `use_sampling_factor` parameter — `aggregation_to_expression`, `get_count_column`, `get_average_sample_rate_column`, `get_confidence_interval_column`, `get_extrapolated_function`, and the `_get_ci_*` helpers — down to `_get_condition_in_aggregation`, and pass `use_array_map_columns(meta)` at the trace-item-table and time-series call sites. The aggregate field itself (and SELECT) keep reading the JSON column, as before. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01J9drsbsd3QyVPEFy8ALB7A --- .../R_eap_items/resolver_time_series.py | 13 +++-- .../R_eap_items/resolver_trace_item_table.py | 6 +++ .../rpc/v1/resolvers/common/aggregation.py | 47 +++++++++++++++---- tests/web/rpc/test_aggregation.py | 40 ++++++++++++++++ 4 files changed, 93 insertions(+), 13 deletions(-) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py index c562005305d..d0c3b34b54e 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_time_series.py @@ -247,7 +247,9 @@ def _get_reliability_context_columns( ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY, ]: confidence_interval_column = get_confidence_interval_column( - aggregation, _get_attribute_key_to_expression_function(request_meta) + aggregation, + _get_attribute_key_to_expression_function(request_meta), + use_array_map_columns=use_array_map_columns(request_meta), ) if confidence_interval_column is not None: additional_context_columns.append( @@ -258,7 +260,9 @@ def _get_reliability_context_columns( ) average_sample_rate_column = get_average_sample_rate_column( - aggregation, _get_attribute_key_to_expression_function(request_meta) + aggregation, + _get_attribute_key_to_expression_function(request_meta), + use_array_map_columns=use_array_map_columns(request_meta), ) additional_context_columns.append( SelectedExpression( @@ -267,7 +271,9 @@ def _get_reliability_context_columns( ) ) count_column = get_count_column( - aggregation, _get_attribute_key_to_expression_function(request_meta) + aggregation, + _get_attribute_key_to_expression_function(request_meta), + use_array_map_columns=use_array_map_columns(request_meta), ) additional_context_columns.append( SelectedExpression(name=count_column.alias, expression=count_column) @@ -297,6 +303,7 @@ def _proto_expression_to_ast_expression( expr.conditional_aggregation, (attribute_key_to_expression), use_sampling_factor(request_meta), + use_array_map_columns(request_meta), ) match expr.conditional_aggregation.WhichOneof("default_value"): case None: diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py index f97fb627954..a5c86f7813f 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py @@ -240,6 +240,7 @@ def aggregation_filter_to_expression( agg_filter.comparison_filter.conditional_aggregation, attribute_key_to_expression, use_sampling_factor(request_meta), + use_array_map_columns(request_meta), ), agg_filter.comparison_filter.val, ) @@ -351,6 +352,7 @@ def _convert_order_by( x.column.conditional_aggregation, attribute_key_to_expression, use_sampling_factor(request_meta), + use_array_map_columns(request_meta), ), ) ) @@ -425,6 +427,7 @@ def _get_reliability_context_columns( confidence_interval_column = get_confidence_interval_column( column.conditional_aggregation, attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request_meta), ) if confidence_interval_column is not None: context_columns.append( @@ -437,6 +440,7 @@ def _get_reliability_context_columns( average_sample_rate_column = get_average_sample_rate_column( column.conditional_aggregation, attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request_meta), ) context_columns.append( SelectedExpression( @@ -448,6 +452,7 @@ def _get_reliability_context_columns( count_column = get_count_column( column.conditional_aggregation, attribute_key_to_expression, + use_array_map_columns=use_array_map_columns(request_meta), ) context_columns.append(SelectedExpression(name=count_column.alias, expression=count_column)) return context_columns @@ -528,6 +533,7 @@ def _column_to_expression(column: Column, request_meta: RequestMeta) -> Expressi column.conditional_aggregation, attribute_key_to_expression, use_sampling_factor(request_meta), + use_array_map_columns(request_meta), ) match column.conditional_aggregation.WhichOneof("default_value"): case None: diff --git a/snuba/web/rpc/v1/resolvers/common/aggregation.py b/snuba/web/rpc/v1/resolvers/common/aggregation.py index 674e0cc69ad..170fef03de2 100644 --- a/snuba/web/rpc/v1/resolvers/common/aggregation.py +++ b/snuba/web/rpc/v1/resolvers/common/aggregation.py @@ -63,6 +63,7 @@ def _get_condition_in_aggregation( aggregation: AttributeAggregation | AttributeConditionalAggregation, attribute_key_to_expression: Callable[[AttributeKey], Expression], + use_array_map_columns: bool = False, ) -> Expression: condition_in_aggregation: Expression = literal(True) if isinstance(aggregation, AttributeConditionalAggregation): @@ -71,7 +72,10 @@ def _get_condition_in_aggregation( # result-block column name stable across mixed-version ClickHouse nodes on # distributed reads (membership_as_has, see common._in_or_has). condition_in_aggregation = trace_item_filters_to_expression( - aggregation.filter, attribute_key_to_expression, membership_as_has=True + aggregation.filter, + attribute_key_to_expression, + membership_as_has=True, + use_array_map_columns=use_array_map_columns, ) return condition_in_aggregation @@ -375,6 +379,7 @@ def get_average_sample_rate_column( aggregation: AttributeConditionalAggregation, attribute_key_to_expression: Callable[[AttributeKey], Expression], use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression: alias = CustomColumnInformation( custom_column_id="average_sample_rate", @@ -387,7 +392,7 @@ def get_average_sample_rate_column( ) field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) return f.divide( f.countIf( @@ -415,13 +420,16 @@ def _get_count_column_alias( def get_count_column( aggregation: AttributeConditionalAggregation, attribute_key_to_expression: Callable[[AttributeKey], Expression], + use_array_map_columns: bool = False, ) -> Expression: field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) return f.countIf( field, and_cond( field_exists, - _get_condition_in_aggregation(aggregation, attribute_key_to_expression), + _get_condition_in_aggregation( + aggregation, attribute_key_to_expression, use_array_map_columns + ), ), alias=_get_count_column_alias(aggregation), ) @@ -484,11 +492,12 @@ def get_extrapolated_function( field_exists: Expression, attribute_key_to_expression: Callable[[AttributeKey], Expression], use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> CurriedFunctionCall | FunctionCall | None: alias = aggregation.label if aggregation.label else None alias_dict = {"alias": alias} if alias else {} condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) sampling_weight = _get_sampling_weight_expression( @@ -606,6 +615,7 @@ def _get_ci_count( alias: str | None = None, z_value: float = Z_VALUE_P95, use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression: r""" confidence interval = Z \cdot \sqrt{\sum_{i=1}^n w_i^2 - w_i} @@ -629,7 +639,7 @@ def _get_ci_count( field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) alias_dict = {"alias": alias} if alias else {} sampling_weight = _get_sampling_weight_expression( @@ -656,6 +666,7 @@ def _get_ci_sum( alias: str | None = None, z_value: float = Z_VALUE_P95, use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression: r""" confidence interval = Z \cdot \sqrt{\sum_{i=1}^n x_i^2 \cdot (w_i^2 - w_i)} @@ -673,7 +684,7 @@ def _get_ci_sum( field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) alias_dict = {"alias": alias} if alias else {} sampling_weight = _get_sampling_weight_expression( @@ -702,6 +713,7 @@ def _get_ci_avg( attribute_key_to_expression: Callable[[AttributeKey], Expression], alias: str | None = None, use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression: """ confidence interval = (\\frac{t + err_t}{c - err_c} - \\frac{t - err_t}{c + err_c}) \\cdot 0.5 @@ -728,7 +740,7 @@ def _get_ci_avg( field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) alias_dict = {"alias": alias} if alias else {} sampling_weight = _get_sampling_weight_expression( @@ -753,6 +765,7 @@ def _get_ci_avg( f"{alias}__sum_err", Z_VALUE_P975, use_sampling_factor, + use_array_map_columns, ) expr_count_err = _get_ci_count( aggregation, @@ -760,6 +773,7 @@ def _get_ci_avg( f"{alias}__count_err", Z_VALUE_P975, use_sampling_factor, + use_array_map_columns, ) return f.divide( @@ -784,6 +798,7 @@ def get_confidence_interval_column( aggregation: AttributeConditionalAggregation, attribute_key_to_expression: Callable[[AttributeKey], Expression], use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression | None: """ Returns the expression for calculating the upper confidence limit for a given aggregation. If the aggregation cannot be extrapolated, returns None. @@ -798,15 +813,21 @@ def get_confidence_interval_column( attribute_key_to_expression, alias, use_sampling_factor=use_sampling_factor, + use_array_map_columns=use_array_map_columns, ), Function.FUNCTION_SUM: _get_ci_sum( aggregation, attribute_key_to_expression, alias, use_sampling_factor=use_sampling_factor, + use_array_map_columns=use_array_map_columns, ), Function.FUNCTION_AVG: _get_ci_avg( - aggregation, attribute_key_to_expression, alias, use_sampling_factor + aggregation, + attribute_key_to_expression, + alias, + use_sampling_factor, + use_array_map_columns, ), Function.FUNCTION_P50: _get_possible_percentiles_expression( aggregation, 0.5, attribute_key_to_expression, use_sampling_factor @@ -879,12 +900,13 @@ def aggregation_to_expression( aggregation: AttributeConditionalAggregation, attribute_key_to_expression: Callable[[AttributeKey], Expression], use_sampling_factor: bool = False, + use_array_map_columns: bool = False, ) -> Expression: field, field_exists = _resolve_field_and_existence(aggregation, attribute_key_to_expression) alias = aggregation.label if aggregation.label else None alias_dict = {"alias": alias} if alias else {} condition_in_aggregation = _get_condition_in_aggregation( - aggregation, attribute_key_to_expression + aggregation, attribute_key_to_expression, use_array_map_columns ) if aggregation.key.type == AttributeKey.Type.TYPE_ARRAY: @@ -953,7 +975,12 @@ def aggregation_to_expression( ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY, ]: agg_func_expr = get_extrapolated_function( - aggregation, field, field_exists, attribute_key_to_expression, use_sampling_factor + aggregation, + field, + field_exists, + attribute_key_to_expression, + use_sampling_factor, + use_array_map_columns, ) else: agg_func_expr = function_map.get(aggregation.aggregate) diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index cfb938ec7d8..1ec82e04f04 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -385,3 +385,43 @@ def test_conditional_aggregation_uses_has_for_in_sets() -> None: and [p.value for p in e.parameters[0].parameters if isinstance(p, Literal)] == project_ids ] assert has_over_pids, "expected has(array(), x) in the conditional aggregate" + + +def _aggregation_column_names(expr: Any) -> set[str]: + return {e.column_name for e in expr if isinstance(e, Column)} + + +def test_conditional_aggregation_array_filter_uses_typed_columns() -> None: + """A conditional aggregation whose filter is on an array attribute reads the + typed ``attributes_array_*`` columns when ``use_array_map_columns`` is set, and + the legacy ``attributes_array`` JSON column otherwise.""" + agg = AttributeConditionalAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="my.field"), + label="sum(my.field)", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_ARRAY, name="my_tags"), + op=ComparisonFilter.OP_LIKE, + value=AttributeValue(val_str="%error%"), + ) + ), + ) + + default_cols = _aggregation_column_names( + aggregation_to_expression(agg, attribute_key_to_expression) + ) + assert "attributes_array" in default_cols + assert not any(c.startswith("attributes_array_") for c in default_cols) + + typed_cols = _aggregation_column_names( + aggregation_to_expression(agg, attribute_key_to_expression, use_array_map_columns=True) + ) + assert { + "attributes_array_string", + "attributes_array_int", + "attributes_array_float", + "attributes_array_bool", + } <= typed_cols + assert "attributes_array" not in typed_cols