Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/ad_buyer/booking/pricing.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ class PricingCalculator:
# Volume discount thresholds (only for agency/advertiser tiers)
VOLUME_DISCOUNT_THRESHOLDS: list[tuple[int, float]] = [
(10_000_000, 10.0), # 10M+ impressions: 10% discount
(5_000_000, 5.0), # 5M+ impressions: 5% discount
(5_000_000, 5.0), # 5M+ impressions: 5% discount
]

# Tiers eligible for volume discounts
VOLUME_ELIGIBLE_TIERS: frozenset[AccessTier] = frozenset({
AccessTier.AGENCY,
AccessTier.ADVERTISER,
})
VOLUME_ELIGIBLE_TIERS: frozenset[AccessTier] = frozenset(
{
AccessTier.AGENCY,
AccessTier.ADVERTISER,
}
)

def calculate(
self,
Expand Down
19 changes: 6 additions & 13 deletions src/ad_buyer/clients/capability_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ def invalidate(self, seller_endpoint: str | None = None) -> None:
return
self._cache.pop(self._cache_key(seller_endpoint), None)

async def discover_capabilities(
self, seller_endpoint: str
) -> CapabilityDiscoveryResult:
async def discover_capabilities(self, seller_endpoint: str) -> CapabilityDiscoveryResult:
"""Discover a seller's audience capabilities.

Hits the cache first, returns immediately on a fresh hit. On a
Expand Down Expand Up @@ -275,8 +273,7 @@ async def discover_capabilities(
await client.aclose()
except (httpx.HTTPError, ValueError) as exc:
logger.warning(
"capability_client fetch failed endpoint=%s err=%s -- "
"treating as legacy",
"capability_client fetch failed endpoint=%s err=%s -- treating as legacy",
seller_endpoint,
exc,
)
Expand All @@ -288,8 +285,7 @@ async def discover_capabilities(

if response.status_code != 200:
logger.warning(
"capability_client non-200 endpoint=%s status=%d -- "
"treating as legacy",
"capability_client non-200 endpoint=%s status=%d -- treating as legacy",
seller_endpoint,
response.status_code,
)
Expand All @@ -304,8 +300,7 @@ async def discover_capabilities(
payload = response.json()
except ValueError as exc:
logger.warning(
"capability_client invalid JSON endpoint=%s err=%s -- "
"treating as legacy",
"capability_client invalid JSON endpoint=%s err=%s -- treating as legacy",
seller_endpoint,
exc,
)
Expand All @@ -322,8 +317,7 @@ async def discover_capabilities(
# fallback -- standard segments only, no constraints, no
# extensions, no exclusions, no agentic.
logger.info(
"capability_client legacy seller (no audience_capabilities) "
"endpoint=%s",
"capability_client legacy seller (no audience_capabilities) endpoint=%s",
seller_endpoint,
)
caps = _legacy_default_capabilities()
Expand Down Expand Up @@ -356,8 +350,7 @@ async def discover_capabilities(
self._store(key, caps, fetched_at=now, max_age=max_age)

logger.info(
"capability_client %s endpoint=%s schema=%s agentic=%s "
"supports=(c=%s,e=%s,x=%s)",
"capability_client %s endpoint=%s schema=%s agentic=%s supports=(c=%s,e=%s,x=%s)",
cache_status,
seller_endpoint,
caps.schema_version,
Expand Down
65 changes: 34 additions & 31 deletions src/ad_buyer/clients/deals_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@
# Code-internal naming continues to use `ucp_*` (no rename per §5.6 lock).
_UCP_CONTENT_TYPE = "application/vnd.ucp.embedding+json; v=1"
_AGENTIC_AUDIENCES_CONTENT_TYPE = "application/vnd.iab.agentic-audiences+json; v=1"
_AUDIENCE_PLAN_ACCEPT = (
f"{_UCP_CONTENT_TYPE}, {_AGENTIC_AUDIENCES_CONTENT_TYPE}"
)
_AUDIENCE_PLAN_ACCEPT = f"{_UCP_CONTENT_TYPE}, {_AGENTIC_AUDIENCES_CONTENT_TYPE}"


class DealsClientError(Exception):
Expand Down Expand Up @@ -389,7 +387,10 @@ async def _request_with_retry(
if attempt < self._max_retries:
logger.warning(
"Timeout on attempt %d/%d for %s %s",
attempt, self._max_retries, method, path,
attempt,
self._max_retries,
method,
path,
)
continue
raise last_error from exc
Expand All @@ -416,7 +417,11 @@ async def _request_with_retry(
if attempt < self._max_retries:
logger.warning(
"Retryable error %d on attempt %d/%d for %s %s",
response.status_code, attempt, self._max_retries, method, path,
response.status_code,
attempt,
self._max_retries,
method,
path,
)
continue
raise last_error
Expand Down Expand Up @@ -460,16 +465,10 @@ def _build_error_from_response(response: httpx.Response) -> DealsClientError:
if isinstance(inner, dict):
error_code = str(inner.get("error", "") or "")
# Surface the inner "message" / "detail" / repr for humans.
detail = str(
inner.get("message")
or inner.get("detail")
or ""
)
detail = str(inner.get("message") or inner.get("detail") or "")
raw_unsupported = inner.get("unsupported")
if isinstance(raw_unsupported, list):
unsupported = [
u for u in raw_unsupported if isinstance(u, dict)
]
unsupported = [u for u in raw_unsupported if isinstance(u, dict)]
else:
# Flat shape: {"error": "...", "detail": "..."}
error_code = str(data.get("error", "") or "")
Expand Down Expand Up @@ -511,15 +510,19 @@ def _persist_quote(self, quote: QuoteResponse, request: QuoteRequest) -> None:
deal_type=request.deal_type,
status="quoted",
price=quote.pricing.final_cpm if quote.pricing.final_cpm is not None else 0.0,
original_price=quote.pricing.base_cpm if quote.pricing.base_cpm is not None else 0.0, # noqa: E501
original_price=quote.pricing.base_cpm
if quote.pricing.base_cpm is not None
else 0.0, # noqa: E501
impressions=quote.terms.impressions,
flight_start=quote.terms.flight_start,
flight_end=quote.terms.flight_end,
metadata=json.dumps({
"quote_id": quote.quote_id,
"buyer_tier": quote.buyer_tier,
"expires_at": quote.expires_at,
}),
metadata=json.dumps(
{
"quote_id": quote.quote_id,
"buyer_tier": quote.buyer_tier,
"expires_at": quote.expires_at,
}
),
)
except (sqlite3.Error, OSError, ValueError, AttributeError):
logger.exception("Failed to persist quote %s to DealStore", quote.quote_id)
Expand All @@ -544,15 +547,17 @@ def _persist_deal(self, deal: DealResponse) -> None:
impressions=deal.terms.impressions,
flight_start=deal.terms.flight_start,
flight_end=deal.terms.flight_end,
metadata=json.dumps({
"quote_id": deal.quote_id,
"buyer_tier": deal.buyer_tier,
"expires_at": deal.expires_at,
"activation_instructions": deal.activation_instructions,
"openrtb_params": (
deal.openrtb_params.model_dump() if deal.openrtb_params else None
),
}),
metadata=json.dumps(
{
"quote_id": deal.quote_id,
"buyer_tier": deal.buyer_tier,
"expires_at": deal.expires_at,
"activation_instructions": deal.activation_instructions,
"openrtb_params": (
deal.openrtb_params.model_dump() if deal.openrtb_params else None
),
}
),
)
except (sqlite3.Error, OSError, ValueError, AttributeError):
logger.exception("Failed to persist deal %s to DealStore", deal.deal_id)
Expand All @@ -577,6 +582,4 @@ def _update_stored_deal_status(self, deal: DealResponse) -> None:
)
break
except (sqlite3.Error, OSError, ValueError, AttributeError):
logger.exception(
"Failed to update stored deal status for %s", deal.deal_id
)
logger.exception("Failed to update stored deal status for %s", deal.deal_id)
3 changes: 1 addition & 2 deletions src/ad_buyer/clients/openrtb_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ def build_openrtb_audience_targeting(
if agentic_refs:
if not enable_agentic_ext:
logger.warning(
"openrtb_builder skipping agentic refs: "
"enable_agentic_openrtb_ext flag disabled",
"openrtb_builder skipping agentic refs: enable_agentic_openrtb_ext flag disabled",
extra={
"openrtb_drop": {
"reason": "agentic_ext_feature_flag_disabled",
Expand Down
4 changes: 1 addition & 3 deletions src/ad_buyer/clients/seller_order_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,5 @@ async def get_order_history(self, order_id: str) -> dict[str, Any] | None:
)
return None
except (httpx.RequestError, OSError) as e:
logger.error(
"Failed to reach seller for order %s history: %s", order_id, e
)
logger.error("Failed to reach seller for order %s history: %s", order_id, e)
return None
14 changes: 7 additions & 7 deletions src/ad_buyer/clients/ucp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
UCP_CONTENT_TYPE = "application/vnd.ucp.embedding+json; v=1"

# Embedding provenance literal -- mirrors ComplianceContext.embedding_provenance.
EmbeddingProvenance = Literal[
"mock", "local_buyer", "advertiser_supplied", "hosted_external"
]
EmbeddingProvenance = Literal["mock", "local_buyer", "advertiser_supplied", "hosted_external"]

# Local model details for "local" / "hybrid" embedding modes.
# Locked in docs/decisions/EMBEDDING_STRATEGY_2026-04-25.md (E2-1).
Expand All @@ -54,10 +52,10 @@
# follow the same convention as the buyer's local model. Re-derive these
# from `ad_buyer.eval.evaluate_embedding_modes()` whenever the model swaps.
_SIMILARITY_THRESHOLDS: dict[str, dict[str, float]] = {
"mock": {"strong": 0.85, "moderate": 0.65, "weak": 0.40},
"local": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
"advertiser": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
"hybrid": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
"mock": {"strong": 0.85, "moderate": 0.65, "weak": 0.40},
"local": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
"advertiser": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
"hybrid": {"strong": 0.70, "moderate": 0.50, "weak": 0.30},
}
_DEFAULT_THRESHOLDS = _SIMILARITY_THRESHOLDS["mock"]

Expand All @@ -69,6 +67,7 @@ def _similarity_thresholds_for_mode() -> dict[str, float]:

return _SIMILARITY_THRESHOLDS.get(settings.embedding_mode, _DEFAULT_THRESHOLDS)


# Process-wide cached SentenceTransformer instance. Lazy-loaded on first
# use to avoid paying ~80MB model download cost at import time.
_LOCAL_MODEL: Any = None
Expand All @@ -88,6 +87,7 @@ def _get_local_embedding_model() -> Any:
return None
try:
from sentence_transformers import SentenceTransformer # type: ignore

_LOCAL_MODEL = SentenceTransformer(LOCAL_EMBEDDING_MODEL_NAME)
return _LOCAL_MODEL
except Exception as exc: # ImportError, network errors, etc.
Expand Down
16 changes: 10 additions & 6 deletions src/ad_buyer/clients/unified_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ async def call_tool(
"list_creatives": "List all creatives",
# Create tools (args required)
"create_account": lambda a: (
f"Create an account named '{a.get('name')}' "
f"of type {a.get('type', 'advertiser')}"
f"Create an account named '{a.get('name')}' of type {a.get('type', 'advertiser')}"
),
"create_order": lambda a: (
f"Create an order named '{a.get('name')}' "
Expand Down Expand Up @@ -544,7 +543,6 @@ async def get_pricing(
if result.data and isinstance(result.data, dict):
base_price = result.data.get("basePrice", result.data.get("price"))
if isinstance(base_price, (int, float)) and self.buyer_identity:

tier_obj = self.buyer_identity.get_access_tier()
discount = self.buyer_identity.get_discount_percentage()

Expand All @@ -559,7 +557,9 @@ async def get_pricing(

result.data["pricing"] = {
"base_price": pricing.base_price,
"tiered_price": round(pricing.final_price, 2) if pricing.final_price is not None else None, # noqa: E501
"tiered_price": round(pricing.final_price, 2)
if pricing.final_price is not None
else None, # noqa: E501
"tier": tier_obj.value if self.buyer_identity else "public",
"tier_discount": discount if self.buyer_identity else 0,
"volume_discount": pricing.volume_discount,
Expand All @@ -572,8 +572,12 @@ async def get_pricing(
result.data["pricing"] = {
"base_price": None,
"tiered_price": None,
"tier": self.buyer_identity.get_access_tier().value if self.buyer_identity else "public", # noqa: E501
"tier_discount": self.buyer_identity.get_discount_percentage() if self.buyer_identity else 0, # noqa: E501
"tier": self.buyer_identity.get_access_tier().value
if self.buyer_identity
else "public", # noqa: E501
"tier_discount": self.buyer_identity.get_discount_percentage()
if self.buyer_identity
else 0, # noqa: E501
"volume_discount": 0.0,
"requested_volume": volume,
"deal_type": deal_type,
Expand Down
4 changes: 1 addition & 3 deletions src/ad_buyer/crews/channel_crews.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,7 @@ def kickoff_channel_crew_with_audience(
factory = _CHANNEL_FACTORIES.get(channel)
if factory is None:
valid = sorted(_CHANNEL_FACTORIES.keys())
raise ValueError(
f"Unknown channel {channel!r}; expected one of {valid}"
)
raise ValueError(f"Unknown channel {channel!r}; expected one of {valid}")

# If the caller passed a CampaignBrief but no plan, run the planner
# step inline. The import is local because the planner module pulls
Expand Down
5 changes: 1 addition & 4 deletions src/ad_buyer/data/taxonomy_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,7 @@ def validate_ref(ref: AudienceRef) -> ValidationResult:
if entry is None:
return ValidationResult(
valid=False,
reason=(
f"identifier {ref.identifier!r} not found in "
f"{ref.taxonomy} v{ref.version}"
),
reason=(f"identifier {ref.identifier!r} not found in {ref.taxonomy} v{ref.version}"),
)
if ref.type == "agentic":
# Agentic loader returns a stub, not a real validation.
Expand Down
Loading
Loading