diff --git a/config/retailers.py b/config/retailers.py index fe828187..c8d1d339 100644 --- a/config/retailers.py +++ b/config/retailers.py @@ -13,7 +13,7 @@ "source_name": "a101", "currency": "TRY", "categories": { - "fruit_veg": "meyve-ve-sebze", + "fruit_veg": "meyve-sebze", }, - } + }, } diff --git a/pipeline/loaders_fact.py b/pipeline/loaders_fact.py index 03d16861..59815bfb 100644 --- a/pipeline/loaders_fact.py +++ b/pipeline/loaders_fact.py @@ -43,9 +43,19 @@ def insert_fact_observation( if not can_insert: logger.info( - "Skipping fact insert — product=%r reason=%s", + ( + "DEBUG - FACT SKIP | product=%r | reason=%s | " + "price=%r | normalized_unit=%r | normalized_quantity=%r | " + "price_per_unit=%r | standardized_product_name=%r | category_name=%r" + ), product.get("product_name"), reason, + transformed.get("price"), + transformed.get("normalized_unit"), + transformed.get("normalized_quantity"), + transformed.get("price_per_unit"), + transformed.get("standardized_product_name"), + transformed.get("category_name"), ) return False @@ -127,4 +137,11 @@ def insert_fact_observation( ), ) - return cursor.rowcount == 1 \ No newline at end of file + if cursor.rowcount != 1: + logger.info( + "DEBUG - FACT NOT INSERTED | product=%r | reason=conflict_or_no_insert | event_id=%r", + product.get("product_name"), + event_id, + ) + + return cursor.rowcount == 1 diff --git a/pipeline/run_a101_pipeline.py b/pipeline/run_a101_pipeline.py index 97b0ad9d..201de234 100644 --- a/pipeline/run_a101_pipeline.py +++ b/pipeline/run_a101_pipeline.py @@ -23,7 +23,19 @@ def run_pipeline(category_key: str): - category_slug = RETAILER_CONFIG["a101"]["categories"][category_key] + base_category_slug = RETAILER_CONFIG["a101"]["categories"][category_key] + + if category_key == "fruit_veg": + category_slugs = [ + f"{base_category_slug}/meyve", + f"{base_category_slug}/sebze", + f"{base_category_slug}/yesillik", + ] + else: + category_slugs = [base_category_slug] + + logger.info("DEBUG - base_category_slug=%s", base_category_slug) + logger.info("DEBUG - category_slugs=%s", category_slugs) conn = None run_id = None @@ -32,9 +44,50 @@ def run_pipeline(category_key: str): # ------------------------- # 1) SCRAPE # ------------------------- - products = get_a101_category_products(category_slug) + products = [] + seen_product_names = set() + + for slug in category_slugs: + logger.info("DEBUG - Scraping A101 subcategory: %s", slug) + subcategory_products = get_a101_category_products(slug) + + logger.info( + "DEBUG - A101 subcategory %s returned %d products", + slug, + len(subcategory_products), + ) + + if subcategory_products: + logger.info( + "DEBUG - First 3 products from %s: %s", + slug, + [p.get("product_name") for p in subcategory_products[:3]], + ) + + for product in subcategory_products: + product_name = (product.get("product_name") or "").strip().lower() + if not product_name: + continue + + if product_name in seen_product_names: + continue + + seen_product_names.add(product_name) + products.append(product) + logger.info("A101 scraped %d products", len(products)) + if products: + logger.info("A101 first 5 products preview:") + for product in products[:5]: + logger.info( + "product_name=%r shown_price_tl=%r unit=%r unit_amount=%r", + product.get("product_name"), + product.get("shown_price_tl"), + product.get("unit"), + product.get("unit_amount"), + ) + # ------------------------- # 2) DB CONNECT # ------------------------- @@ -45,12 +98,26 @@ def run_pipeline(category_key: str): cur, source_name=source_name, category_key=category_key, - category_slug=category_slug, + category_slug=base_category_slug, triggered_by="local_test", pipeline_version="v2-a101", ) conn.commit() + # Eğer 0 ürün geldiyse başarılı sayma + if not products: + with conn.cursor() as cur: + fail_run( + cur, + run_id, + f"A101 scraper returned 0 products for category_key={category_key} category_slug={base_category_slug}", + ) + conn.commit() + + raise RuntimeError( + f"A101 scraper returned 0 products for category_key={category_key} category_slug={base_category_slug}" + ) + raw_count = 0 stg_count = 0 fact_count = 0 @@ -62,17 +129,15 @@ def run_pipeline(category_key: str): for product in products: try: with conn.cursor() as cur: - # RAW event_id = insert_raw_event( cur, run_id=run_id, product=product, - category_slug=category_slug, + category_slug=base_category_slug, source_name=source_name, currency=currency, ) - # STG SOURCE insert_stg_source_product( cur, event_id=event_id, @@ -81,17 +146,14 @@ def run_pipeline(category_key: str): source_name=source_name, ) - # TRANSFORM transformed = transform_product(product) - # DIM product_id = get_or_create_product_id( cur, transformed["standardized_product_name"], transformed.get("category_name"), ) - # STG NORMALIZED insert_stg_normalized_observation( cur, event_id, @@ -101,7 +163,6 @@ def run_pipeline(category_key: str): source_name=source_name, ) - # STG OBS observation_id = insert_stg_observation( cur, event_id, @@ -112,7 +173,6 @@ def run_pipeline(category_key: str): currency=currency, ) - # FACT inserted = insert_fact_observation( cur, observation_id, @@ -167,7 +227,7 @@ def run_pipeline(category_key: str): if conn: try: conn.rollback() - except: + except Exception: pass if conn and run_id: @@ -175,7 +235,7 @@ def run_pipeline(category_key: str): with conn.cursor() as cur: fail_run(cur, run_id, str(e)) conn.commit() - except: + except Exception: pass logger.exception("A101 pipeline failed: %s", e) @@ -183,4 +243,4 @@ def run_pipeline(category_key: str): finally: if conn: - conn.close() \ No newline at end of file + conn.close() diff --git a/pipeline/transforms.py b/pipeline/transforms.py index e4a93d3c..b2374253 100644 --- a/pipeline/transforms.py +++ b/pipeline/transforms.py @@ -3,13 +3,8 @@ def normalize_unit( - unit: Optional[str], quantity: Any + unit: Optional[str], quantity: Any, product_name: Optional[str] = None ) -> Tuple[Optional[str], Optional[float]]: - if unit is None: - return None, None - - unit_upper = str(unit).strip().upper() - qty: Optional[float] = None if quantity is not None: try: @@ -17,15 +12,63 @@ def normalize_unit( except (TypeError, ValueError): qty = None - if unit_upper == "GRAM": - if qty is None: - return "kg", None - return "kg", round(qty / 1000, 4) + # 1) Unit varsa önce onu kullan + if unit is not None: + unit_upper = str(unit).strip().upper() + + if unit_upper == "GRAM": + if qty is None: + return "kg", None + return "kg", round(qty / 1000, 4) + + if unit_upper == "KG": + return "kg", qty if qty is not None else 1.0 + + if unit_upper == "PIECE": + return "piece", qty if qty is not None else 1.0 + + if unit_upper == "LITER": + return "liter", qty if qty is not None else 1.0 + + if unit_upper == "ML": + if qty is None: + return "liter", None + return "liter", round(qty / 1000, 4) + + return unit.lower(), qty + + # 2) Unit yoksa product_name'den çözmeye çalış + name = (product_name or "").lower().strip() + + if not name: + return None, None + + tr_map = {"ı": "i", "ğ": "g", "ü": "u", "ş": "s", "ö": "o", "ç": "c"} + for old, new in tr_map.items(): + name = name.replace(old, new) + + if re.search(r"\bkg\b", name): + return "kg", 1.0 - if unit_upper == "PIECE": - return "piece", qty if qty is not None else 1.0 + gram_match = re.search(r"(\d+(?:[.,]\d+)?)\s*g\b", name) + if gram_match: + grams = float(gram_match.group(1).replace(",", ".")) + return "kg", round(grams / 1000, 4) - return unit.lower(), qty + liter_match = re.search(r"(\d+(?:[.,]\d+)?)\s*l\b", name) + if liter_match: + liters = float(liter_match.group(1).replace(",", ".")) + return "liter", liters + + ml_match = re.search(r"(\d+(?:[.,]\d+)?)\s*ml\b", name) + if ml_match: + ml = float(ml_match.group(1).replace(",", ".")) + return "liter", round(ml / 1000, 4) + + if re.search(r"\badet\b", name): + return "piece", 1.0 + + return None, None def standardize_product_name(product_name: Optional[str]) -> Optional[str]: @@ -106,7 +149,11 @@ def transform_product(product: dict[str, Any]) -> dict[str, Any]: unit = product.get("unit") unit_amount = product.get("unit_amount") - normalized_unit, normalized_quantity = normalize_unit(unit, unit_amount) + normalized_unit, normalized_quantity = normalize_unit( + unit, + unit_amount, + product.get("product_name"), + ) price_per_unit = calculate_price_per_unit(price, normalized_quantity) unit_price_label = build_unit_price_label(normalized_unit) standardized_product_name = standardize_product_name(product.get("product_name")) diff --git a/scraper/a101/scraper.py b/scraper/a101/scraper.py index 3030c1b7..07d8dccd 100644 --- a/scraper/a101/scraper.py +++ b/scraper/a101/scraper.py @@ -7,10 +7,12 @@ def extract_unit_info(product_name: str): return None, None patterns = [ - (r"(\d+(?:[.,]\d+)?)\s*(kg|KG|Kg)", "KG"), - (r"(\d+(?:[.,]\d+)?)\s*(g|G|gr|GR)", "GRAM"), - (r"(\d+(?:[.,]\d+)?)\s*(l|L|lt|LT)", "LITER"), - (r"(\d+(?:[.,]\d+)?)\s*(ml|ML)", "ML"), + (r"(\d+(?:[.,]\d+)?)\s*(kg|KG|Kg)\b", "KG"), + (r"(\d+(?:[.,]\d+)?)\s*(g|G|gr|GR)\b", "GRAM"), + (r"(\d+(?:[.,]\d+)?)\s*(l|L|lt|LT)\b", "LITER"), + (r"(\d+(?:[.,]\d+)?)\s*(ml|ML)\b", "ML"), + (r"(\d+(?:[.,]\d+)?)\s*(adet|Adet|ADET)\b", "PIECE"), + (r"(\d+(?:[.,]\d+)?)\s*(li|LI|'lu|'lü)\b", "PIECE"), ] for pattern, unit in patterns: @@ -25,42 +27,150 @@ def extract_unit_info(product_name: str): return None, None -def is_valid_product_name(name: str) -> bool: - if not name: - return False +def normalize_category_name(category_slug: str) -> str: + if category_slug.endswith("/meyve"): + return "Meyve" + if category_slug.endswith("/sebze"): + return "Sebze" + if category_slug.endswith("/yesillik"): + return "Yesillik" + return category_slug.replace("-", " ").title() - invalid_fragments = [ - "Popüler Ürünler", - "Çerez Kullanımı", - "Kampanyalar", - "Giriş Yap", - "Sepetim", - "Aramak istediğin ürünü yaz", - "Böyle bir sayfa bulamadık", - "A101 Hep Ucuz", - ] - for fragment in invalid_fragments: - if fragment.lower() in name.lower(): - return False +def get_section_header(category_slug: str) -> str | None: + if category_slug.endswith("/meyve"): + return "Meyve" + if category_slug.endswith("/sebze"): + return "Sebze" + if category_slug.endswith("/yesillik"): + return "Yeşillik" + return None - return True +def is_price_line(text: str) -> bool: + return bool(re.match(r"^₺\s*[\d\.]+(?:,\d{1,2})?$", text)) + + +def find_best_section_start(lines: list[str], section_header: str) -> int | None: + """ + Same header appears multiple times in the page. + Choose the occurrence that is followed by real product rows (price lines). + """ + candidate_indices = [i for i, line in enumerate(lines) if line == section_header] + + if not candidate_indices: + return None + + best_idx = None + best_score = -1 + + for idx in candidate_indices: + window = lines[idx : idx + 80] + score = sum(1 for line in window if is_price_line(line)) + + # Prefer the first strong candidate with nearby prices + if score > best_score: + best_score = score + best_idx = idx + + if best_score <= 0: + return None + + return best_idx + + +def parse_products_from_body_text(body_text: str, category_slug: str): + lines = [line.strip() for line in body_text.splitlines() if line.strip()] + + section_header = get_section_header(category_slug) + if not section_header: + print(f"DEBUG - section header missing for category_slug={category_slug}") + return [] + + start_idx = find_best_section_start(lines, section_header) + if start_idx is None: + print(f"DEBUG - section header not found or no priced rows nearby: {section_header}") + return [] + + stop_headers = {"Meyve", "Sebze", "Yeşillik", "Sepetim", "Giriş Yap", "Site Haritası"} + + section_lines = [] + for line in lines[start_idx + 1 :]: + if line in stop_headers and line != section_header: + break + section_lines.append(line) + + print(f"DEBUG - section_header={section_header}") + print(f"DEBUG - section_start_idx={start_idx}") + print(f"DEBUG - section_lines_sample={section_lines[:40]}") + + products = [] + seen_names = set() + + i = 0 + while i < len(section_lines) - 1: + name = section_lines[i] + next_line = section_lines[i + 1] + + if is_price_line(next_line): + lowered = name.lower() + + # obvious non-product lines + if any( + x in lowered + for x in [ + "kampanyalar", + "giriş yap", + "sepetim", + "anasayfa", + "site haritası", + "yardım", + "iletişim", + "kategoriler", + ] + ): + i += 1 + continue + + raw_price = next_line.replace("₺", "").replace(".", "").replace(",", ".").strip() -def parse_price_from_lines(lines): - for line in lines: - if "₺" in line: - raw_price = ( - line.replace("₺", "") - .replace(".", "") - .replace(",", ".") - .strip() - ) try: - return float(raw_price) + price = float(raw_price) except ValueError: + i += 1 + continue + + normalized_name = name.lower().strip() + if normalized_name in seen_names: + i += 2 continue - return None + + seen_names.add(normalized_name) + + extracted_unit, extracted_amount = extract_unit_info(name) + + products.append( + { + "product_id": f"a101_{category_slug}_{len(products)}", + "product_name": name, + "sku": f"a101_{category_slug}_{len(products)}", + "shown_price_tl": price, + "regular_price_tl": price, + "discount_rate": None, + "product_url": f"https://www.a101.com.tr/kapida/{category_slug}/", + "brand_name": None, + "category_name": normalize_category_name(category_slug), + "unit": extracted_unit, + "unit_amount": extracted_amount, + } + ) + + i += 2 + continue + + i += 1 + + return products def get_a101_products(category_slug: str): @@ -68,11 +178,38 @@ def get_a101_products(category_slug: str): products = [] with sync_playwright() as p: - browser = p.chromium.launch(headless=False) - page = browser.new_page() - page.goto(url, timeout=60000) + browser = p.chromium.launch( + headless=True, + args=[ + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-dev-shm-usage", + ], + ) + + context = browser.new_context( + viewport={"width": 1440, "height": 2200}, + user_agent=( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/123.0.0.0 Safari/537.36" + ), + locale="tr-TR", + ) + + page = context.new_page() + + page.add_init_script(""" + Object.defineProperty(navigator, 'webdriver', { + get: () => undefined + }); + """) + + print(f"DEBUG - CATEGORY URL: {url}") + + page.goto(url, timeout=60000, wait_until="domcontentloaded") + page.wait_for_timeout(5000) - # Konum popup for text in [ "Bu defalık izin ver", "Siteyi ziyaret ederken izin ver", @@ -80,72 +217,36 @@ def get_a101_products(category_slug: str): ]: try: page.get_by_text(text, exact=True).click(timeout=3000) + print(f"DEBUG - clicked location popup button: {text}") break except Exception: pass - # Cookie popup for text in ["KABUL ET", "Kabul Et", "Tümünü Kabul Et"]: try: page.get_by_text(text, exact=True).click(timeout=3000) + print(f"DEBUG - clicked cookie popup button: {text}") break except Exception: pass page.wait_for_timeout(4000) - # Lazy loading için scroll - for _ in range(8): - page.mouse.wheel(0, 2500) + for i in range(10): + page.mouse.wheel(0, 3000) page.wait_for_timeout(1200) + print(f"DEBUG - scroll step {i + 1}") - cards = page.locator("div[class*='product']").all() + body_text = page.locator("body").inner_text() + print(f"DEBUG - BODY TEXT SAMPLE: {body_text[:2000]}") - for i, card in enumerate(cards): - try: - text_blob = card.inner_text().strip() - if not text_blob: - continue - - lines = [x.strip() for x in text_blob.splitlines() if x.strip()] - if not lines: - continue - - price = parse_price_from_lines(lines) - if price is None: - continue - - name = None - for line in lines: - if "₺" not in line and len(line) > 2: - if is_valid_product_name(line): - name = line - break - - if not name: - continue - - extracted_unit, extracted_amount = extract_unit_info(name) - - products.append( - { - "product_id": f"a101_{i}", - "product_name": name, - "sku": f"a101_{i}", - "shown_price_tl": price, - "regular_price_tl": price, - "discount_rate": None, - "product_url": url, - "brand_name": None, - "category_name": "Meyve ve Sebze", - "unit": extracted_unit, - "unit_amount": extracted_amount, - } - ) + products = parse_products_from_body_text(body_text, category_slug) - except Exception: - continue + print(f"DEBUG - TOTAL PRODUCTS RETURNED: {len(products)}") + if products: + print(f"DEBUG - FIRST 10 PRODUCTS: {[p['product_name'] for p in products[:10]]}") + context.close() browser.close() - return products \ No newline at end of file + return products