Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions snuba/protos/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,69 @@ def type_array_to_stored_array_json_path(attr_key: AttributeKey) -> JsonPath:
)


def type_array_to_membership_array_expression_from_typed_columns(
attr_key: AttributeKey,
) -> FunctionCall:
"""WHERE-clause membership array built from the typed array map columns.

Counterpart to ``type_array_to_membership_array_expression``, which reads the
legacy ``attributes_array`` JSON column. Since 2026-06-22 array attributes are
also double-written into typed ``Map(String, Array(T))`` columns; for query
windows new enough that those columns are fully populated (see
``use_array_map_columns``) we read them instead.

Returns a normalized ``Array(String)`` of every element across all four typed
columns so the per-element comparisons built by
``_type_array_membership_rhs_expression`` keep matching the JSON-column
behaviour (string elements stay as-is, numbers become ``toString(...)``, and
bools become ``'true'``/``'false'``). Unlike the scalar double-write, array
integers are written only to ``attributes_array_int`` (not also to the float
column — see ``AttributeMap::insert_array`` in the ``eap_items`` Rust
processor), so the int column must be read for int arrays to match; element
types never overlap across columns, so no element is duplicated.
"""
if attr_key.type != AttributeKey.Type.TYPE_ARRAY:
raise MalformedAttributeException(
f"type_array_to_membership_array_expression_from_typed_columns expected "
f"TYPE_ARRAY, got {AttributeKey.Type.Name(attr_key.type)}"
)
alias = f"{_build_label_mapping_key(attr_key)}__array_members"
name = attr_key.name

def _to_string_elements(col_name: str) -> FunctionCall:
x = Argument(None, "x")
return FunctionCall(
None,
"arrayMap",
(
Lambda(None, ("x",), FunctionCall(None, "toString", (x,))),
arrayElement(None, column(col_name), literal(name)),
),
)

string_elements = arrayElement(None, column("attributes_array_string"), literal(name))
int_elements = _to_string_elements("attributes_array_int")
float_elements = _to_string_elements("attributes_array_float")
bool_x = Argument(None, "x")
bool_elements = FunctionCall(
None,
"arrayMap",
(
Lambda(
None,
("x",),
FunctionCall(None, "if", (bool_x, literal("true"), literal("false"))),
),
arrayElement(None, column("attributes_array_bool"), literal(name)),
),
)
return FunctionCall(
alias=alias,
function_name="arrayConcat",
parameters=(string_elements, int_elements, float_elements, bool_elements),
)


def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
"""Convert an AttributeKey proto to a Snuba Expression.

Expand Down
7 changes: 7 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ class RedisClusters(TypedDict):
# Represents 10AM PST April 8, 2025 which is the date we started writing the sampling factor. We can remove this setting once 90 days have passed since this date.
USE_SAMPLING_FACTOR_TIMESTAMP_SECONDS = 1744131600

# Represents 2026-06-23 00:00 UTC, the day after we began double-writing array
# attributes into the typed `attributes_array_*` map columns (2026-06-22). Only
# queries whose window starts on/after this read those typed columns; older data
# only exists in the legacy `attributes_array` JSON column. We can remove this
# setting once enough time has passed that all queryable data is double-written.
USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS = 1782172800

# Processor/Writer Options


Expand Down
85 changes: 74 additions & 11 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
PROTO_TYPE_TO_CLICKHOUSE_TYPE,
MalformedAttributeException,
type_array_to_membership_array_expression,
type_array_to_membership_array_expression_from_typed_columns,
)
from snuba.protos.common import (
attribute_key_to_expression as _attribute_key_to_expression,
Expand Down Expand Up @@ -86,14 +87,22 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:


def _trace_item_filter_key_expression(
attr_to_key_expression_callable: Callable[[AttributeKey], Expression], key: AttributeKey
attr_to_key_expression_callable: Callable[[AttributeKey], Expression],
key: AttributeKey,
use_array_map_columns: bool = False,
) -> Expression:
"""predicates must use the normalized
``arrayMap`` (``type_array_to_membership_array_expression``) so
``arrayExists`` compares per element. It is different from SELECT predicate.

When ``use_array_map_columns`` is set, array predicates read the typed
``attributes_array_*`` map columns instead of the legacy ``attributes_array``
JSON column (see ``use_array_map_columns``).
"""
if key.type == AttributeKey.Type.TYPE_ARRAY:
try:
if use_array_map_columns:
return type_array_to_membership_array_expression_from_typed_columns(key)
return type_array_to_membership_array_expression(key)
except MalformedAttributeException as e:
raise BadSnubaRPCRequestException(str(e)) from e
Expand Down Expand Up @@ -248,6 +257,27 @@ def use_sampling_factor(meta: RequestMeta) -> bool:
return meta.start_timestamp.seconds >= use_sampling_factor_timestamp_seconds


def use_array_map_columns(meta: RequestMeta) -> bool:
"""
Array attributes were double-written into the typed ``attributes_array_*`` map
columns starting 2026-06-22, so we should only read those columns on queries
whose window starts on/after the cutoff (2026-06-23 UTC by default). Older data
only exists in the legacy ``attributes_array`` JSON column. A config value of 0
disables the typed-column read path entirely.
"""
use_array_map_columns_timestamp_seconds = cast(
int,
state.get_int_config(
"use_array_map_columns_timestamp_seconds",
settings.USE_ARRAY_MAP_COLUMNS_TIMESTAMP_SECONDS,
),
)
if use_array_map_columns_timestamp_seconds == 0:
return False

return meta.start_timestamp.seconds >= use_array_map_columns_timestamp_seconds


def treeify_or_and_conditions(query: Query) -> None:
"""
look for expressions like or(a, b, c) and turn them into or(a, or(b, c))
Expand Down Expand Up @@ -735,6 +765,7 @@ def trace_item_filters_to_expression(
item_filter: TraceItemFilter,
attribute_key_to_expression: Callable[[AttributeKey], Expression],
membership_as_has: bool = False,
use_array_map_columns: bool = False,
) -> Expression:
"""
Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024")
Expand All @@ -746,6 +777,10 @@ def trace_item_filters_to_expression(
``IN`` set leaks an unstable ``__set_*`` identifier into the result-block column
name and breaks mixed-version distributed reads (see ``_in_or_has``). Leave the
default for WHERE clauses, where the prepared ``IN`` set drives pruning.
:param use_array_map_columns: resolve ``TYPE_ARRAY`` predicates against the typed
``attributes_array_*`` map columns instead of the legacy ``attributes_array``
JSON column. Pass ``use_array_map_columns(meta)`` so this is only enabled for
query windows new enough that the typed columns are fully populated.
:return:
"""
if item_filter.HasField("and_filter"):
Expand All @@ -754,11 +789,19 @@ def trace_item_filters_to_expression(
return literal(True)
elif len(filters) == 1:
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
filters[0],
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
return and_cond(
*(
trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has)
trace_item_filters_to_expression(
x,
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
for x in filters
)
)
Comment thread
sentry[bot] marked this conversation as resolved.
Expand All @@ -769,11 +812,19 @@ def trace_item_filters_to_expression(
raise BadSnubaRPCRequestException("Invalid trace item filter, empty 'or' clause")
elif len(filters) == 1:
return trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
filters[0],
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
return or_cond(
*(
trace_item_filters_to_expression(x, attribute_key_to_expression, membership_as_has)
trace_item_filters_to_expression(
x,
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
for x in filters
)
)
Expand All @@ -785,14 +836,20 @@ def trace_item_filters_to_expression(
elif len(filters) == 1:
return not_cond(
trace_item_filters_to_expression(
filters[0], attribute_key_to_expression, membership_as_has
filters[0],
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
)
return not_cond(
and_cond(
*(
trace_item_filters_to_expression(
x, attribute_key_to_expression, membership_as_has
x,
attribute_key_to_expression,
membership_as_has,
use_array_map_columns,
)
for x in filters
)
Expand All @@ -808,7 +865,9 @@ def trace_item_filters_to_expression(
_validate_comparison_filter_type_array(op, v)

k_expression = _trace_item_filter_key_expression(
attr_to_key_expression_callable=attribute_key_to_expression, key=k
attr_to_key_expression_callable=attribute_key_to_expression,
key=k,
use_array_map_columns=use_array_map_columns,
)

value_type = v.WhichOneof("value")
Expand Down Expand Up @@ -1074,6 +1133,7 @@ def trace_item_filters_to_expression(
_trace_item_filter_key_expression(
attr_to_key_expression_callable=attribute_key_to_expression,
key=item_filter.exists_filter.key,
use_array_map_columns=use_array_map_columns,
)
)

Expand Down Expand Up @@ -1180,9 +1240,12 @@ def get_subscriptable_field(field: Expression) -> SubscriptableReference | None:
if isinstance(field, FunctionCall) and field.function_name == "arrayElement":
return map_key_exists(field.parameters[0], field.parameters[1])

if isinstance(field, FunctionCall) and field.function_name == "arrayMap":
# Array attributes in the JSON column return empty arrays (not NULL)
# for missing keys, so notEmpty is the correct existence check.
if isinstance(field, FunctionCall) and field.function_name in ("arrayMap", "arrayConcat"):
# Array attributes return empty arrays (not NULL) for missing keys, so
# notEmpty is the correct existence check. This covers both the JSON-column
# membership expression (arrayMap) and the typed-column one (arrayConcat of
# the per-type map lookups, see
# type_array_to_membership_array_expression_from_typed_columns).
return f.notEmpty(field)

return f.isNotNull(field)
15 changes: 12 additions & 3 deletions snuba/web/rpc/v1/endpoint_get_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
base_conditions_and,
trace_item_filters_to_expression,
treeify_or_and_conditions,
use_array_map_columns,
)
from snuba.web.rpc.common.debug_info import (
extract_response_meta,
Expand Down Expand Up @@ -556,7 +557,9 @@ def _is_cross_event_query(
return len(set([f.item_type for f in filters])) > 1

def _get_trace_item_filter_expressions(
self, filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter]
self,
filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter],
request_meta: RequestMeta,
) -> dict[TraceItemType.ValueType, Expression]:
"""
Returns a dict mapping item types to a filter expression for that item type.
Expand Down Expand Up @@ -584,6 +587,7 @@ def _get_trace_item_filter_expressions(
),
attribute_key_to_expression,
membership_as_has=True,
use_array_map_columns=use_array_map_columns(request_meta),
),
)

Expand All @@ -607,6 +611,7 @@ def _get_trace_ids_for_single_item_query(
),
),
attribute_key_to_expression,
use_array_map_columns=use_array_map_columns(request.meta),
)
selected_columns: list[SelectedExpression] = [
SelectedExpression(
Expand Down Expand Up @@ -671,7 +676,9 @@ def _get_metadata_for_traces(
) -> tuple[list[GetTracesResponse.Trace], Any]:
# We use the item type specified in the request meta for the trace item filter conditions.
# If no item type is specified, we use all the filters.
filter_expressions_by_item_type = self._get_trace_item_filter_expressions(request.filters)
filter_expressions_by_item_type = self._get_trace_item_filter_expressions(
request.filters, request.meta
)
trace_item_filters_expression = None
item_type = None
if request.meta.trace_item_type in filter_expressions_by_item_type:
Expand Down Expand Up @@ -787,7 +794,9 @@ def _get_metadata_for_traces_with_subquery(
"""
# We use the item type specified in the request meta for the trace item filter conditions.
# If no item type is specified, we use all the filters.
filter_expressions_by_item_type = self._get_trace_item_filter_expressions(request.filters)
filter_expressions_by_item_type = self._get_trace_item_filter_expressions(
request.filters, request.meta
)
trace_item_filters_expression = None
item_type = None
if request.meta.trace_item_type in filter_expressions_by_item_type:
Expand Down
7 changes: 6 additions & 1 deletion snuba/web/rpc/v1/endpoint_trace_item_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
decode_attributes_array_value,
trace_item_filters_to_expression,
treeify_or_and_conditions,
use_array_map_columns,
)
from snuba.web.rpc.common.debug_info import (
extract_response_meta,
Expand Down Expand Up @@ -99,7 +100,11 @@ def _build_query(request: TraceItemDetailsRequest) -> Query:
column("trace_id"),
literal(request.trace_id),
),
trace_item_filters_to_expression(request.filter, attribute_key_to_expression),
trace_item_filters_to_expression(
request.filter,
attribute_key_to_expression,
use_array_map_columns=use_array_map_columns(request.meta),
),
),
limit=1,
)
Expand Down
3 changes: 3 additions & 0 deletions snuba/web/rpc/v1/resolvers/R_eap_items/heatmap_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
base_conditions_and,
trace_item_filters_to_expression,
treeify_or_and_conditions,
use_array_map_columns,
)
from snuba.web.rpc.common.debug_info import setup_trace_query_settings
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
Expand Down Expand Up @@ -124,6 +125,7 @@ def _get_min_max_bucketsize_y(
filter_expression = trace_item_filters_to_expression(
filter,
(attribute_key_to_expression),
use_array_map_columns=use_array_map_columns(in_msg.meta),
)
condition = base_conditions_and(in_msg.meta, filter_expression)
min_max_query = Query(
Expand Down Expand Up @@ -214,6 +216,7 @@ def _build_heatmap_query(
filter_expression = trace_item_filters_to_expression(
filter,
(attribute_key_to_expression),
use_array_map_columns=use_array_map_columns(self.in_msg.meta),
)
condition = base_conditions_and(self.in_msg.meta, filter_expression)
bucket_index_y = f.least(
Expand Down
Loading
Loading