Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/retailers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"source_name": "a101",
"currency": "TRY",
"categories": {
"fruit_veg": "meyve-ve-sebze",
"fruit_veg": "meyve-sebze",
},
}
},
}
21 changes: 19 additions & 2 deletions pipeline/loaders_fact.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,19 @@ def insert_fact_observation(

if not can_insert:
logger.info(
"Skipping fact insert — product=%r reason=%s",
(
"DEBUG - FACT SKIP | product=%r | reason=%s | "
"price=%r | normalized_unit=%r | normalized_quantity=%r | "
"price_per_unit=%r | standardized_product_name=%r | category_name=%r"
),
product.get("product_name"),
reason,
transformed.get("price"),
transformed.get("normalized_unit"),
transformed.get("normalized_quantity"),
transformed.get("price_per_unit"),
transformed.get("standardized_product_name"),
transformed.get("category_name"),
)
return False

Expand Down Expand Up @@ -127,4 +137,11 @@ def insert_fact_observation(
),
)

return cursor.rowcount == 1
if cursor.rowcount != 1:
logger.info(
"DEBUG - FACT NOT INSERTED | product=%r | reason=conflict_or_no_insert | event_id=%r",
product.get("product_name"),
event_id,
)

return cursor.rowcount == 1
88 changes: 74 additions & 14 deletions pipeline/run_a101_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@


def run_pipeline(category_key: str):
category_slug = RETAILER_CONFIG["a101"]["categories"][category_key]
base_category_slug = RETAILER_CONFIG["a101"]["categories"][category_key]

if category_key == "fruit_veg":
category_slugs = [
f"{base_category_slug}/meyve",
f"{base_category_slug}/sebze",
f"{base_category_slug}/yesillik",
]
else:
category_slugs = [base_category_slug]

logger.info("DEBUG - base_category_slug=%s", base_category_slug)
logger.info("DEBUG - category_slugs=%s", category_slugs)

conn = None
run_id = None
Expand All @@ -32,9 +44,50 @@ def run_pipeline(category_key: str):
# -------------------------
# 1) SCRAPE
# -------------------------
products = get_a101_category_products(category_slug)
products = []
seen_product_names = set()

for slug in category_slugs:
logger.info("DEBUG - Scraping A101 subcategory: %s", slug)
subcategory_products = get_a101_category_products(slug)

logger.info(
"DEBUG - A101 subcategory %s returned %d products",
slug,
len(subcategory_products),
)

if subcategory_products:
logger.info(
"DEBUG - First 3 products from %s: %s",
slug,
[p.get("product_name") for p in subcategory_products[:3]],
)

for product in subcategory_products:
product_name = (product.get("product_name") or "").strip().lower()
if not product_name:
continue

if product_name in seen_product_names:
continue

seen_product_names.add(product_name)
products.append(product)

logger.info("A101 scraped %d products", len(products))

if products:
logger.info("A101 first 5 products preview:")
for product in products[:5]:
logger.info(
"product_name=%r shown_price_tl=%r unit=%r unit_amount=%r",
product.get("product_name"),
product.get("shown_price_tl"),
product.get("unit"),
product.get("unit_amount"),
)

# -------------------------
# 2) DB CONNECT
# -------------------------
Expand All @@ -45,12 +98,26 @@ def run_pipeline(category_key: str):
cur,
source_name=source_name,
category_key=category_key,
category_slug=category_slug,
category_slug=base_category_slug,
triggered_by="local_test",
pipeline_version="v2-a101",
)
conn.commit()

# Eğer 0 ürün geldiyse başarılı sayma
if not products:
with conn.cursor() as cur:
fail_run(
cur,
run_id,
f"A101 scraper returned 0 products for category_key={category_key} category_slug={base_category_slug}",
)
conn.commit()

raise RuntimeError(
f"A101 scraper returned 0 products for category_key={category_key} category_slug={base_category_slug}"
)

raw_count = 0
stg_count = 0
fact_count = 0
Expand All @@ -62,17 +129,15 @@ def run_pipeline(category_key: str):
for product in products:
try:
with conn.cursor() as cur:
# RAW
event_id = insert_raw_event(
cur,
run_id=run_id,
product=product,
category_slug=category_slug,
category_slug=base_category_slug,
source_name=source_name,
currency=currency,
)

# STG SOURCE
insert_stg_source_product(
cur,
event_id=event_id,
Expand All @@ -81,17 +146,14 @@ def run_pipeline(category_key: str):
source_name=source_name,
)

# TRANSFORM
transformed = transform_product(product)

# DIM
product_id = get_or_create_product_id(
cur,
transformed["standardized_product_name"],
transformed.get("category_name"),
)

# STG NORMALIZED
insert_stg_normalized_observation(
cur,
event_id,
Expand All @@ -101,7 +163,6 @@ def run_pipeline(category_key: str):
source_name=source_name,
)

# STG OBS
observation_id = insert_stg_observation(
cur,
event_id,
Expand All @@ -112,7 +173,6 @@ def run_pipeline(category_key: str):
currency=currency,
)

# FACT
inserted = insert_fact_observation(
cur,
observation_id,
Expand Down Expand Up @@ -167,20 +227,20 @@ def run_pipeline(category_key: str):
if conn:
try:
conn.rollback()
except:
except Exception:
pass

if conn and run_id:
try:
with conn.cursor() as cur:
fail_run(cur, run_id, str(e))
conn.commit()
except:
except Exception:
pass

logger.exception("A101 pipeline failed: %s", e)
raise

finally:
if conn:
conn.close()
conn.close()
75 changes: 61 additions & 14 deletions pipeline/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,72 @@


def normalize_unit(
unit: Optional[str], quantity: Any
unit: Optional[str], quantity: Any, product_name: Optional[str] = None
) -> Tuple[Optional[str], Optional[float]]:
if unit is None:
return None, None

unit_upper = str(unit).strip().upper()

qty: Optional[float] = None
if quantity is not None:
try:
qty = float(quantity)
except (TypeError, ValueError):
qty = None

if unit_upper == "GRAM":
if qty is None:
return "kg", None
return "kg", round(qty / 1000, 4)
# 1) Unit varsa önce onu kullan
if unit is not None:
unit_upper = str(unit).strip().upper()

if unit_upper == "GRAM":
if qty is None:
return "kg", None
return "kg", round(qty / 1000, 4)

if unit_upper == "KG":
return "kg", qty if qty is not None else 1.0

if unit_upper == "PIECE":
return "piece", qty if qty is not None else 1.0

if unit_upper == "LITER":
return "liter", qty if qty is not None else 1.0

if unit_upper == "ML":
if qty is None:
return "liter", None
return "liter", round(qty / 1000, 4)

return unit.lower(), qty

# 2) Unit yoksa product_name'den çözmeye çalış
name = (product_name or "").lower().strip()

if not name:
return None, None

tr_map = {"ı": "i", "ğ": "g", "ü": "u", "ş": "s", "ö": "o", "ç": "c"}
for old, new in tr_map.items():
name = name.replace(old, new)

if re.search(r"\bkg\b", name):
return "kg", 1.0

if unit_upper == "PIECE":
return "piece", qty if qty is not None else 1.0
gram_match = re.search(r"(\d+(?:[.,]\d+)?)\s*g\b", name)
if gram_match:
grams = float(gram_match.group(1).replace(",", "."))
return "kg", round(grams / 1000, 4)

return unit.lower(), qty
liter_match = re.search(r"(\d+(?:[.,]\d+)?)\s*l\b", name)
if liter_match:
liters = float(liter_match.group(1).replace(",", "."))
return "liter", liters

ml_match = re.search(r"(\d+(?:[.,]\d+)?)\s*ml\b", name)
if ml_match:
ml = float(ml_match.group(1).replace(",", "."))
return "liter", round(ml / 1000, 4)

if re.search(r"\badet\b", name):
return "piece", 1.0

return None, None


def standardize_product_name(product_name: Optional[str]) -> Optional[str]:
Expand Down Expand Up @@ -106,7 +149,11 @@ def transform_product(product: dict[str, Any]) -> dict[str, Any]:
unit = product.get("unit")
unit_amount = product.get("unit_amount")

normalized_unit, normalized_quantity = normalize_unit(unit, unit_amount)
normalized_unit, normalized_quantity = normalize_unit(
unit,
unit_amount,
product.get("product_name"),
)
price_per_unit = calculate_price_per_unit(price, normalized_quantity)
unit_price_label = build_unit_price_label(normalized_unit)
standardized_product_name = standardize_product_name(product.get("product_name"))
Expand Down
Loading
Loading