Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion gcm/monitoring/meta_utils/ods.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import json
import logging
import math
import time
from dataclasses import asdict, dataclass
from typing import Callable, List, Mapping, Optional, TYPE_CHECKING, TypedDict, Union
Expand Down Expand Up @@ -117,8 +118,18 @@ def write(

def get_payload(config: ODSConfig, data: List[ODSData]) -> ODSPayload:
datapoints = []
dropped = 0
for ods_data in data:
for key, value in ods_data.metrics.items():
# Non-finite values (NaN/Inf) serialize to bare `NaN`/`Infinity` tokens,
# which are invalid JSON. The ODS Graph API endpoint rejects the entire
# request with `400 (#100) param datapoints must be an array`, dropping
# every datapoint in the batch (not just the offending one). Skip them so
# a single bad metric (e.g. a mean/variance computed over zero samples)
# can't take down all of the cluster's metrics.
if isinstance(value, float) and not math.isfinite(value):
dropped += 1
continue
datapoints.append(
{
"entity": ods_data.entity,
Expand All @@ -128,12 +139,19 @@ def get_payload(config: ODSConfig, data: List[ODSData]) -> ODSPayload:
}
)

if dropped:
logger.warning(
f"Dropped {dropped} non-finite (NaN/Inf) datapoint(s) before ODS write "
f"for category {config.category_id}."
)
logger.debug(
f"Payload of {len(datapoints)} datapoints for ODS category {config.category_id}."
)
return {
"access_token": config.secret_key,
"datapoints": json.dumps(datapoints),
# allow_nan=False guards against any non-finite value slipping through above:
# it raises locally instead of emitting invalid JSON that the API would 400 on.
"datapoints": json.dumps(datapoints, allow_nan=False),
"category_id": config.category_id,
}

Expand Down
82 changes: 81 additions & 1 deletion gcm/tests/test_ods.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import pytest
import requests

import json

from gcm.monitoring.clock import ClockImpl
from gcm.monitoring.meta_utils.ods import ODSConfig, ODSData, write
from gcm.monitoring.meta_utils.ods import get_payload, ODSConfig, ODSData, write
from pytest_mock import MockerFixture
from requests_mock import Mocker as RequestsMocker

Expand Down Expand Up @@ -110,6 +112,84 @@ def test_write_raises(
will_retry.assert_has_calls(mocker.call(i) for i in range(1, num_retries + 1))


class TestGetPayload:
@staticmethod
def test_includes_finite_values(ods_config: ODSConfig) -> None:
data = [
ODSData(
entity="fake_entity",
time=TEST_TIME,
metrics={"bar": 42, "baz": 100.0},
)
]

payload = get_payload(ods_config, data)
datapoints = json.loads(payload["datapoints"])

assert len(datapoints) == 2
assert {dp["key"] for dp in datapoints} == {"bar", "baz"}

@staticmethod
def test_drops_non_finite_values(ods_config: ODSConfig) -> None:
data = [
ODSData(
entity="fake_entity",
time=TEST_TIME,
metrics={
"good": 1.0,
"nan": float("nan"),
"inf": float("inf"),
"neg_inf": float("-inf"),
"also_good": 7,
},
)
]

# json.loads rejects bare NaN/Infinity tokens, so a successful parse also
# proves the serialized payload is valid JSON (what the ODS API requires).
payload = get_payload(ods_config, data)
datapoints = json.loads(payload["datapoints"])

assert {dp["key"] for dp in datapoints} == {"good", "also_good"}

@staticmethod
def test_non_finite_values_do_not_raise(ods_config: ODSConfig) -> None:
data = [
ODSData(
entity="fake_entity",
time=TEST_TIME,
metrics={"nan": float("nan")},
)
]

payload = get_payload(ods_config, data)

assert json.loads(payload["datapoints"]) == []


def test_write_with_non_finite_values_sends_valid_json(
requests_mock: RequestsMocker,
mocker: MockerFixture,
ods_config: ODSConfig,
) -> None:
data = [
ODSData(
entity="fake_entity",
time=TEST_TIME,
metrics={"good": 1.0, "nan": float("nan")},
)
]
requests_mock.post(ods_config.endpoint)
will_retry = mocker.MagicMock()

write(data, ods_config, will_retry=will_retry)

assert requests_mock.call_count == 1
assert not will_retry.called
sent = json.loads(requests_mock.last_request.json()["datapoints"])
assert {dp["key"] for dp in sent} == {"good"}


class TestODSConfig:
@staticmethod
def test_loads_value() -> None:
Expand Down
Loading