From 53eb7693190ed63436dd5f9a6e50921cf877dd73 Mon Sep 17 00:00:00 2001 From: Mohammed Taha Khan Date: Fri, 26 Jun 2026 11:31:36 +0200 Subject: [PATCH] Improve harvester log grouping and error reporting --- site/cds_rdm/ext.py | 59 ++ .../harvester_download/resources/resource.py | 13 +- site/cds_rdm/harvester_runs/logs.py | 721 ++++++++++++++++-- site/cds_rdm/inspire_harvester/writer.py | 302 ++++++-- .../harvester_download/report_body.html | 122 ++- 5 files changed, 1082 insertions(+), 135 deletions(-) diff --git a/site/cds_rdm/ext.py b/site/cds_rdm/ext.py index 17d9dcd7..668a5eb4 100644 --- a/site/cds_rdm/ext.py +++ b/site/cds_rdm/ext.py @@ -6,6 +6,14 @@ # the terms of the GPL-2.0 License; see LICENSE file for more details. """CDS-RDM module.""" +from datetime import datetime + +from flask import current_app +from invenio_jobs.logging.jobs import ContextAwareOSHandler, job_context +from invenio_jobs.services import JobLogEntrySchema +from invenio_search import current_search_client +from invenio_search.utils import prefix_index + from cds_rdm.clc_sync.resources.config import CLCSyncResourceConfig from cds_rdm.clc_sync.resources.resource import CLCSyncResource from cds_rdm.clc_sync.resources.utils import get_clc_sync_entry @@ -21,6 +29,44 @@ from .views import get_linked_records_search_query +class CDSContextAwareOSHandler(ContextAwareOSHandler): + """Job log handler that preserves selected structured fields.""" + + _extra_fields = ( + "entry_id", + "inspire_id", + "record_pid", + "report_group_key", + "report_kind", + "report_reason", + "skip_sentry", + ) + + def enrich_log(self, record): + """Enrich log record with context and supported extra fields.""" + context = dict(job_context.get()) + log_data = { + "timestamp": datetime.now().isoformat(), + "level": record.levelname, + "message": record.getMessage(), + "module": record.module, + "function": record.funcName, + "line": record.lineno, + "context": context, + } + serialized_data = JobLogEntrySchema().load(log_data) + for field_name in self._extra_fields: + value = getattr(record, field_name, None) + if value is not None: + serialized_data[field_name] = value + return serialized_data + + def index_in_os(self, log_data): + """Send log data to OpenSearch.""" + full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"]) + current_search_client.index(index=full_index_name, body=log_data) + + class CDS_RDM_App(object): """CDS-RDM App.""" @@ -39,6 +85,7 @@ def init_app(self, app): """Flask application initialization.""" self.init_services(app) self.init_resources(app) + self.init_job_logging(app) app.jinja_env.globals["get_clc_sync_entry"] = get_clc_sync_entry app.jinja_env.globals["evaluate_permissions"] = evaluate_permissions # Register filter for building linked records search query @@ -61,6 +108,18 @@ def init_resources(self, app): config=HarvesterDownloadResourceConfig, ) + def init_job_logging(self, app): + """Replace the default jobs log handler with the CDS-aware variant.""" + handlers = list(app.logger.handlers) + for handler in handlers: + if type(handler) is ContextAwareOSHandler: + app.logger.removeHandler(handler) + + if app.config.get("JOBS_LOGGING"): + os_handler = CDSContextAwareOSHandler() + os_handler.setLevel(app.config["JOBS_LOGGING_LEVEL"]) + app.logger.addHandler(os_handler) + class CDS_RDM_UI(object): """CDS-RDM extension.""" diff --git a/site/cds_rdm/harvester_download/resources/resource.py b/site/cds_rdm/harvester_download/resources/resource.py index f9b37c3b..22258a93 100644 --- a/site/cds_rdm/harvester_download/resources/resource.py +++ b/site/cds_rdm/harvester_download/resources/resource.py @@ -16,7 +16,7 @@ from cds_rdm.harvester_runs.logs import ( HarvesterRunError, fetch_harvester_run_logs, - lines_from_hits, + group_log_hits, plain_text_log, resolve_harvester_run, ) @@ -48,8 +48,15 @@ def download(self): raise self._http_json_error(error.message, error.code) hits, total = fetch_harvester_run_logs(run) - lines, error_count, warning_count = lines_from_hits(hits) - logs = plain_text_log(run, lines, total, error_count, warning_count) + grouped_issues, other_lines, error_count, warning_count = group_log_hits(hits) + logs = plain_text_log( + run, + grouped_issues, + other_lines, + total, + error_count, + warning_count, + ) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"harvester_logs_{run.id}_{timestamp}.log" diff --git a/site/cds_rdm/harvester_runs/logs.py b/site/cds_rdm/harvester_runs/logs.py index a49e03a9..93ddec09 100644 --- a/site/cds_rdm/harvester_runs/logs.py +++ b/site/cds_rdm/harvester_runs/logs.py @@ -7,18 +7,26 @@ """Helpers for INSPIRE harvester run logs.""" -import re +import ast import uuid +from collections import OrderedDict from datetime import datetime from flask import current_app from flask_babel import format_datetime -from invenio_access.permissions import system_identity from invenio_i18n import gettext as _ from invenio_jobs.models import Run -from invenio_jobs.proxies import current_jobs_logs_service +from invenio_search import current_search_client +from invenio_search.utils import prefix_index INSPIRE_HARVESTER_TASK = "process_inspire" +INSPIRE_LITERATURE_URL = "https://inspirehep.net/literature/" +BRACKET_INSPIRE_PREFIX = "[INSPIRE#" +BRACKET_ENTRY_PREFIX = "[entry_id=" +SKIPPED_ENTRY_PREFIX = "Skipped entry " +SKIPPED_ENTRY_DELIMITER = " with errors: " +VOCABULARY_WARNING_PREFIX = "Vocabulary term '" +VOCABULARY_WARNING_DELIMITER = "' not found in '" class HarvesterRunError(Exception): @@ -33,18 +41,525 @@ def __init__(self, message, code): def format_timestamp(value): """Format timestamps for display.""" - if value is None or value == "": + if value in (None, ""): return "N/A" if isinstance(value, datetime): dt = value else: try: dt = datetime.fromisoformat(str(value).replace("Z", "+00:00")) - except (ValueError, TypeError): + except (TypeError, ValueError): return str(value) return format_datetime(dt, "yyyy-MM-dd HH:mm") +# String helpers ------------------------------------------------------------- + +def _compact(value): + """Collapse whitespace while keeping the original text readable.""" + text = str(value or "").replace("\\n", " ").replace("\n", " ") + return " ".join(text.split()).strip() + + +def _remove_prefix(text, prefix): + """Remove a prefix when present.""" + return text[len(prefix) :] if text.startswith(prefix) else text + + +def _extract_bracket_value(text, prefix): + """Extract ``[prefixVALUE]`` blocks from a message.""" + text = str(text or "") + start = text.find(prefix) + if start == -1: + return None + value_start = start + len(prefix) + value_end = text.find("]", value_start) + if value_end == -1: + return None + value = text[value_start:value_end].strip() + return value or None + + +def _extract_identifier_after(text, marker): + """Extract an identifier that immediately follows a marker.""" + text = str(text or "") + start = text.find(marker) + if start == -1: + return None + cursor = start + len(marker) + while cursor < len(text) and text[cursor].isspace(): + cursor += 1 + + end = cursor + while end < len(text) and ( + text[end].isalnum() or text[end] in {"_", "-"} + ): + end += 1 + + value = text[cursor:end].strip() + return value or None + + +def _extract_entry_id_from_dict(text): + """Extract ``id`` from a stringified entry dictionary when possible.""" + text = str(text or "").strip() + if not text.startswith("{"): + return None + try: + parsed = ast.literal_eval(text) + except (ValueError, SyntaxError): + return None + if isinstance(parsed, dict): + value = parsed.get("id") + return str(value) if value not in (None, "") else None + return None + + +def _strip_leading_bracket_prefixes(message): + """Drop leading ``[prefix]`` blocks from a log line.""" + text = _compact(message) + while text.startswith("["): + end = text.find("]") + if end == -1: + break + text = text[end + 1 :].lstrip() + return text + + +def _remove_bracket_block(text, prefix): + """Remove ``[prefixVALUE]`` blocks wherever they appear.""" + text = str(text or "") + while True: + start = text.find(prefix) + if start == -1: + return text + end = text.find("]", start + len(prefix)) + if end == -1: + return text + text = f"{text[:start]} {text[end + 1:]}" + + +def _strip_inline_inspire_ids(text): + """Remove inline ``INSPIRE#123`` and ``INSPIRE:123`` references.""" + result = [] + for token in str(text or "").split(): + cleaned = token.strip(".,;:()[]") + if cleaned.startswith(("INSPIRE#", "INSPIRE:")): + continue + result.append(token) + return " ".join(result) + + +def _is_skip_summary_message(message): + """Return True for ``Skipping N transformed entries with errors`` lines.""" + text = _compact(message).rstrip(".").lower() + return text.startswith("skipping ") and text.endswith( + " transformed entries with errors" + ) + + +def _parse_vocabulary_warning(reason): + """Extract term and vocabulary name from a vocabulary warning.""" + reason = _compact(reason) + if not reason.startswith(VOCABULARY_WARNING_PREFIX): + return None + + tail = reason[len(VOCABULARY_WARNING_PREFIX) :] + term, separator, remainder = tail.partition(VOCABULARY_WARNING_DELIMITER) + if not separator: + return None + + vocab_type = remainder.rstrip(".") + if vocab_type.endswith("'"): + vocab_type = vocab_type[:-1] + if not term or not vocab_type: + return None + return term, vocab_type + + +def _extract_draft_id(message): + """Extract the draft id from a draft-related log line.""" + text = _message_without_record_prefix(message) + if not text.startswith("Draft "): + return None + parts = text.split() + return parts[1].rstrip(".") if len(parts) > 1 else None + + +def _leading_inspire_prefix(message): + """Return the leading ``[INSPIRE#...]`` block when present.""" + prefix = _extract_bracket_value(message, BRACKET_INSPIRE_PREFIX) + return f"{BRACKET_INSPIRE_PREFIX}{prefix}]" if prefix else "" + + +# Structured log helpers ----------------------------------------------------- + +def _source(hit): + """Return the normalized log payload.""" + return hit.get("_source") or hit + + +def _nested_value(data, *path): + """Read a nested value from a dictionary.""" + value = data + for key in path: + if not isinstance(value, dict): + return None + value = value.get(key) + if value is None: + return None + return value + + +def _first_value(data, *paths): + """Return the first non-empty value from the provided paths.""" + for path in paths: + if isinstance(path, str): + value = data.get(path) + else: + value = _nested_value(data, *path) + if value not in (None, ""): + return value + return None + + +def _run_stats(run): + """Build a compact summary of record counters for the run.""" + labels = ( + ("inserted_entries_count", _("inserted")), + ("updated_entries_count", _("updated")), + ("errored_entries_count", _("errored")), + ) + parts = [] + for field_name, label in labels: + value = getattr(run, field_name, None) + if value: + parts.append(f"{value} {label}") + return " · ".join(parts) + + +# Log parsing helpers -------------------------------------------------------- + +def _extract_inspire_id(message): + """Extract INSPIRE id from a log message when present.""" + return ( + _extract_bracket_value(message, BRACKET_INSPIRE_PREFIX) + or _extract_identifier_after(message, "INSPIRE#") + or _extract_identifier_after(message, "INSPIRE:") + ) + + +def _extract_entry_id(message): + """Extract entry id from a log message when present.""" + return _extract_bracket_value( + message, BRACKET_ENTRY_PREFIX + ) or _extract_entry_id_from_dict(message) + + +def _message_without_record_prefix(message): + """Strip INSPIRE/CDS prefixes before matching log patterns.""" + return _strip_leading_bracket_prefixes(message).strip() + + +def _is_skip_summary_log(message): + """Return True for legacy batch-summary skip messages.""" + return _is_skip_summary_message(message) + + +def _is_generic_draft_deleted(message): + """Return True for legacy draft-rollback lines without error details.""" + text = _message_without_record_prefix(message).rstrip(".").lower() + return ( + text.startswith("draft ") + and ( + " deleted due to errors" in text + or " is deleted due to errors" in text + or " deleted after create failed" in text + ) + ) + + +def _flatten_error_payload(value, prefix=""): + """Flatten nested error payloads into readable field messages.""" + if isinstance(value, dict): + parts = [] + for key, item in value.items(): + key_text = "" if isinstance(key, int) else str(key) + next_prefix = prefix + if key_text: + next_prefix = f"{prefix}.{key_text}" if prefix else key_text + parts.extend(_flatten_error_payload(item, next_prefix)) + return parts + + if isinstance(value, list): + if value and all(not isinstance(item, (dict, list)) for item in value): + message = ", ".join(_compact(item) for item in value if _compact(item)) + if not message: + return [] + return [f"{prefix}: {message}" if prefix else message] + + parts = [] + for item in value: + parts.extend(_flatten_error_payload(item, prefix)) + return parts + + text = _compact(value) + if not text: + return [] + return [f"{prefix}: {text}" if prefix else text] + + +# Error payload decoding ----------------------------------------------------- + +def _decode_error_repr(value): + """Decode Python-style list/dict error payloads when present.""" + text = str(value or "").strip() + if not (text.startswith("[") or text.startswith("{")): + return text + try: + parsed = ast.literal_eval(text) + except (ValueError, SyntaxError): + return text + + parts = _flatten_error_payload(parsed) + return "; ".join(parts) if parts else text + + +def _parse_skipped_entry_message(message): + """Extract the entry id and inner error from a legacy skipped-entry message.""" + compact_message = _compact(message) + if not compact_message.startswith(SKIPPED_ENTRY_PREFIX): + return None, None + entry_payload, separator, errors_payload = compact_message[ + len(SKIPPED_ENTRY_PREFIX) : + ].partition(SKIPPED_ENTRY_DELIMITER) + if not separator: + return None, None + + entry_id = _extract_entry_id(entry_payload) + errors = _decode_error_repr(errors_payload) + return entry_id, errors + + +def _humanize_reason(reason): + """Turn internal log text into a readable group title.""" + reason = _decode_error_repr(_compact(reason)) + reason = _remove_bracket_block(reason, BRACKET_ENTRY_PREFIX) + reason = _remove_bracket_block(reason, BRACKET_INSPIRE_PREFIX) + reason = _strip_inline_inspire_ids(reason) + reason = reason.replace("failed transformation. Errors:", "") + reason = reason.replace("Errors:", "") + reason = reason.replace("failed transformation.", "") + reason = _compact(reason) + + if _is_skip_summary_message(reason): + return "Entries were skipped because they already had errors." + if reason.startswith("Multiple records match:"): + return "Multiple CDS records match this INSPIRE record." + if reason == "Thesis publication date missing (thesis_info and imprint).": + return ( + "The INSPIRE thesis record is missing a publication date in both " + "thesis_info and imprint." + ) + if reason.startswith("checksum: Field may not be null."): + return "File upload failed because the checksum is missing." + + vocabulary_match = _parse_vocabulary_warning(reason) + if vocabulary_match: + term, vocab_type = vocabulary_match + return ( + f"The vocabulary value '{term}' is missing from the CDS " + f"'{vocab_type}' vocabulary." + ) + + if " not published because of validation errors: " in reason: + subject, details = reason.split( + " not published because of validation errors: ", + 1, + ) + subject = subject[:1].upper() + subject[1:] + return ( + f"{subject} could not be published because it failed " + f"validation: {details}" + ) + + return reason + + +def _is_skipped_entry_log(message, report_kind): + """Return True when the log line represents a skipped entry/error.""" + if report_kind == "skipped_entry": + return True + compact_message = _compact(message) + return compact_message.startswith(SKIPPED_ENTRY_PREFIX) or _is_skip_summary_message( + compact_message + ) + + +# Grouping helpers ----------------------------------------------------------- + +def _collect_detailed_errors_by_record(hits): + """Map record ids to the most useful error line logged for that record.""" + details = {} + for hit in hits: + source = _source(hit) + message = _compact(source.get("message")) + if _is_generic_draft_deleted(message) or _is_skip_summary_log(message): + continue + level = str(source.get("level") or "").upper() + if level not in {"ERROR", "WARNING"}: + continue + record_id = _extract_inspire_id(message) or _extract_entry_id(message) + if not record_id: + continue + detail = _message_without_record_prefix(message) + if not any( + marker in detail + for marker in ( + "Validation error while processing entry", + "Error while processing entry", + "deleted after", + "not published", + "validation errors", + "checksum:", + "Field may not be null", + "does not have a checksum", + ) + ): + continue + details[str(record_id)] = detail + return details + + +def _enrich_draft_deleted_message(message, details_by_record): + """Attach the underlying failure to legacy draft-deletion log lines.""" + if not _is_generic_draft_deleted(message): + return message + + record_id = _extract_inspire_id(message) or _extract_entry_id(message) + draft_id = _extract_draft_id(message) or "unknown" + detail = details_by_record.get(str(record_id)) if record_id else None + if not detail: + return message + + prefix = _leading_inspire_prefix(message) + if prefix: + prefix = f"{prefix} " + return f"{prefix}Draft {draft_id} deleted: {detail}" + + +def _is_non_actionable_log(message): + """Return True for side-effect log lines that should not become failure groups.""" + compact_message = _message_without_record_prefix(message) + return _is_skip_summary_log(compact_message) + + +def _message_reason(message): + """Build a grouping key from the visible log message.""" + reason = _compact(message) + _entry_id, skipped_reason = _parse_skipped_entry_message(reason) + if skipped_reason: + return skipped_reason + reason = _message_without_record_prefix(reason) + if reason.startswith("Draft ") and " deleted: " in reason: + reason = reason.split(" deleted: ", 1)[1] + reason = _remove_prefix(reason, "Error while processing entry:") + reason = _remove_prefix(reason.lstrip(), "Validation error while processing entry:") + reason = _remove_prefix(reason.lstrip(), SKIPPED_ENTRY_PREFIX) + reason = _remove_bracket_block(reason, BRACKET_ENTRY_PREFIX) + if SKIPPED_ENTRY_DELIMITER in reason: + reason = reason.split(SKIPPED_ENTRY_DELIMITER, 1)[1] + return _compact(reason) + + +def _normalize_log_hit(hit, details_by_record=None): + """Convert a raw log hit into the fields used by the report view.""" + source = _source(hit) + message = _compact(source.get("message")) + if details_by_record: + message = _enrich_draft_deleted_message(message, details_by_record) + reason = _compact( + _first_value(source, "report_reason", ("extra", "report_reason")) + ) + group_key = _compact( + _first_value( + source, + "report_group_key", + ("extra", "report_group_key"), + ) + or reason + ) + level = str(source.get("level") or "INFO").upper() + report_kind = _first_value(source, "report_kind", ("extra", "report_kind")) + effective_level = ( + "ERROR" if _is_skipped_entry_log(message, report_kind) else level + ) + if effective_level == "CRITICAL": + effective_level = "ERROR" + + inspire_id = _first_value( + source, + "inspire_id", + ("extra", "inspire_id"), + ("context", "inspire_id"), + ) + if inspire_id is None: + inspire_id = _extract_inspire_id(message) + entry_id = _first_value( + source, + "entry_id", + ("extra", "entry_id"), + ("entry", "id"), + ) + if entry_id is None: + entry_id = _extract_entry_id(message) + skipped_entry_id, skipped_reason = _parse_skipped_entry_message(message) + if entry_id is None and skipped_entry_id is not None: + entry_id = skipped_entry_id + if inspire_id is None and entry_id is not None: + inspire_id = entry_id + + fallback_reason = _message_reason(message) + raw_reason = reason or fallback_reason + raw_group_key = group_key or raw_reason + stable_group_key = _humanize_reason(group_key or fallback_reason) + if _is_skipped_entry_log(message, report_kind): + raw_group_key = raw_reason + stable_group_key = _humanize_reason(raw_reason) + + reason = _humanize_reason(raw_reason) + group_key = stable_group_key + + raw_message = message or _("No log message") + return { + "timestamp": format_timestamp( + _first_value(source, "timestamp", "@timestamp") + ), + "level": effective_level, + "message": raw_message, + "display_message": reason or raw_message, + "report_kind": report_kind, + "raw_report_reason": raw_reason, + "raw_report_group_key": raw_group_key, + "report_reason": reason, + "report_group_key": group_key, + "inspire_id": str(inspire_id) if inspire_id is not None else None, + "entry_id": str(entry_id) if entry_id is not None else None, + "record_id": ( + str(inspire_id) + if inspire_id is not None + else str(entry_id) if entry_id is not None else None + ), + } + + +def _format_plain_text_line(entry): + """Render a single log line for the download view.""" + return f"[{entry['timestamp']}] {entry['level']} {entry['display_message']}".rstrip() + + +# Run resolution and fetching ------------------------------------------------ + def resolve_harvester_run(run_id): """Return a top-level INSPIRE harvester run or raise ``HarvesterRunError``.""" run_id = (run_id or "").strip() @@ -64,17 +579,32 @@ def resolve_harvester_run(run_id): def fetch_harvester_run_logs(run): - """Return ``(hits, total)`` from structured job logs.""" + """Return ``(hits, total)`` for a harvester run.""" try: - result = current_jobs_logs_service.search( - system_identity, - params={ - "q": f'"{run.id}"', - "sort": "timestamp", + full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"]) + max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000) + search_query = { + "query": { + "bool": { + "filter": [ + {"term": {"context.run_id": str(run.id)}}, + {"term": {"context.job_id": str(run.job_id)}}, + ] + } }, + "sort": [ + {"@timestamp": {"order": "asc"}}, + {"_id": {"order": "asc"}}, + ], + "size": max_results, + "track_total_hits": True, + } + response = current_search_client.search( + index=full_index_name, + body=search_query, ) - hits = list(result.hits) - total = result.total or len(hits) + hits = response.get("hits", {}).get("hits", []) + total = response.get("hits", {}).get("total", {}).get("value", len(hits)) except Exception: current_app.logger.exception( "Failed to fetch structured job logs for harvester run %s", run.id @@ -84,33 +614,97 @@ def fetch_harvester_run_logs(run): return hits, total -def lines_from_hits(hits): - """Return de-duplicated log lines and severity counts.""" - task_groups = {} +def group_log_hits(hits, max_examples=5): + """Group structured warning/error log hits by their report key.""" + details_by_record = _collect_detailed_errors_by_record(hits) + records_with_draft_deleted = set() + for hit in hits: + message = _compact(_source(hit).get("message")) + if _is_generic_draft_deleted(message): + record_id = _extract_inspire_id(message) or _extract_entry_id(message) + if record_id: + records_with_draft_deleted.add(str(record_id)) + seen = set() - error_count = 0 - warning_count = 0 + buckets = OrderedDict() + other_lines = [] + for hit in hits: - raw_ts = hit.get("timestamp") - level = hit.get("level", "INFO") - message = re.sub(r"\s+", " ", (hit.get("message") or "")).strip() - key = (raw_ts, level, message) - if key in seen: + raw_message = _compact(_source(hit).get("message")) + entry = _normalize_log_hit(hit, details_by_record) + if _is_non_actionable_log(entry["message"]): continue - seen.add(key) - if level == "ERROR": - error_count += 1 - elif level == "WARNING": - warning_count += 1 - task_id = (hit.get("context") or {}).get("task_id") or "unknown" - task_groups.setdefault(task_id, []).append( - f"[{format_timestamp(raw_ts)}] {level} {message}" + + record_id = str(entry.get("record_id") or "") + if ( + record_id in records_with_draft_deleted + and not _is_generic_draft_deleted(raw_message) + and record_id in details_by_record + and _message_without_record_prefix(raw_message).rstrip(".") + == details_by_record[record_id].rstrip(".") + ): + continue + + dedupe_key = ( + entry["timestamp"], + entry["level"], + entry["message"], + entry["record_id"], ) - lines = [line for group in task_groups.values() for line in group] - return lines, error_count, warning_count + if dedupe_key in seen: + continue + seen.add(dedupe_key) + + if entry["level"] in {"ERROR", "WARNING"} and entry["report_group_key"]: + bucket_key = (entry["level"], entry["report_group_key"]) + bucket = buckets.setdefault( + bucket_key, + { + "level": entry["level"], + "title": ( + entry["raw_report_reason"] or entry["raw_report_group_key"] + ), + "entries": [], + "records": [], + "examples": [], + "first_timestamp": entry["timestamp"], + }, + ) + bucket["entries"].append(entry) + if entry["record_id"] and entry["record_id"] not in bucket["records"]: + bucket["records"].append(entry["record_id"]) + if entry["inspire_id"] and entry["inspire_id"] not in bucket["examples"]: + bucket["examples"].append(entry["inspire_id"]) + continue + + other_lines.append(entry) + grouped_issues = [] + for bucket in buckets.values(): + grouped_issues.append( + { + "level": bucket["level"], + "title": bucket["title"], + "count": len(bucket["records"]) or len(bucket["entries"]), + "records": bucket["records"], + "examples": bucket["examples"][:max_examples], + "entries": bucket["entries"], + "first_timestamp": bucket["first_timestamp"], + } + ) -def plain_text_log(run, lines, total, error_count, warning_count): + grouped_issues.sort( + key=lambda issue: (-issue["count"], issue["first_timestamp"], issue["title"]) + ) + + error_count = sum(1 for issue in grouped_issues if issue["level"] == "ERROR") + warning_count = sum(1 for issue in grouped_issues if issue["level"] == "WARNING") + return grouped_issues, other_lines, error_count, warning_count + + +# Report rendering ----------------------------------------------------------- + +def plain_text_log(run, grouped_issues, other_lines, total, error_count, warning_count): """Build the plain-text log file content.""" max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000) status = getattr(run.status, "name", str(run.status)) @@ -121,6 +715,10 @@ def plain_text_log(run, lines, total, error_count, warning_count): if run.finished_at: header.append(f"Finished: {format_timestamp(run.finished_at)}") + run_stats = _run_stats(run) + if run_stats: + header.append(f"Records: {run_stats}") + summary = [] if status in ("FAILED", "PARTIAL_SUCCESS", "SUCCESS"): summary.append( @@ -133,40 +731,59 @@ def plain_text_log(run, lines, total, error_count, warning_count): if run.message: summary.append(run.message) if error_count: - summary.append(_("%(count)s error(s) found in logs below", count=error_count)) + summary.append(_("%(count)s grouped error(s) found below", count=error_count)) if warning_count: summary.append( - _("%(count)s warning(s) found in logs below", count=warning_count) + _("%(count)s grouped warning(s) found below", count=warning_count) ) if summary: header.append("") header.extend(summary) - if total and total > len(lines): + rendered_lines = sum(len(issue["entries"]) for issue in grouped_issues) + len( + other_lines + ) + if total and total > rendered_lines: header.append( - f"Showing first {len(lines)} of {total} log entries " + f"Showing first {rendered_lines} of {total} log entries " f"(truncated at JOBS_LOGS_MAX_RESULTS={max_results})." ) header.append("=" * 80) - logs = "\n".join(header + lines) - if not lines: - logs += "\n" + (run.message or "No logs available for this run.\n") - return logs + body = [] + for issue in grouped_issues: + body.append(f"{issue['level']}: {issue['title']}") + if issue["records"]: + body.append("Affected records: " + ", ".join(issue["records"])) + body.extend(_format_plain_text_line(entry) for entry in issue["entries"]) + body.append("") + + if other_lines: + body.append("Other log lines") + body.append("-" * 80) + body.extend(_format_plain_text_line(entry) for entry in other_lines) + + logs = "\n".join(header + body).rstrip() + if not body: + logs += "\n" + (run.message or "No logs available for this run.") + return logs + "\n" def report_context(run_id): """Build context for the colored HTML report page.""" run = resolve_harvester_run(run_id) hits, total = fetch_harvester_run_logs(run) - lines, error_count, _unused_warnings = lines_from_hits(hits) + grouped_issues, other_lines, error_count, warning_count = group_log_hits(hits) status = getattr(run.status, "name", str(run.status)) + rendered_lines = sum(len(issue["entries"]) for issue in grouped_issues) + len( + other_lines + ) truncation_message = None - if total and total > len(lines): + if total and total > rendered_lines: truncation_message = ( f"Log results truncated. Too many log results returned ({total}). " - f"Only the most recent {len(lines)} results are shown." + f"Only the most recent {rendered_lines} results are shown." ) display_title = (getattr(run, "title", None) or "").strip() or f"Run {run.id}" @@ -177,6 +794,18 @@ def report_context(run_id): "started_at": format_timestamp(run.started_at), "finished_at": format_timestamp(run.finished_at) if run.finished_at else None, "truncation_message": truncation_message, - "lines": lines, + "grouped_errors": [ + issue for issue in grouped_issues if issue["level"] == "ERROR" + ], + "grouped_warnings": [ + issue for issue in grouped_issues if issue["level"] == "WARNING" + ], + "other_lines": other_lines, "error_count": error_count, + "warning_count": warning_count, + "run_stats": _run_stats(run), + "failure_summary_lines": [ + line.strip() for line in (run.message or "").splitlines() if line.strip() + ], + "inspire_literature_url": INSPIRE_LITERATURE_URL, } diff --git a/site/cds_rdm/inspire_harvester/writer.py b/site/cds_rdm/inspire_harvester/writer.py index 3917c118..e60297bc 100644 --- a/site/cds_rdm/inspire_harvester/writer.py +++ b/site/cds_rdm/inspire_harvester/writer.py @@ -6,7 +6,6 @@ # the terms of the MIT License; see LICENSE file for more details. """Writer module.""" -import logging import time from collections import OrderedDict from copy import deepcopy @@ -25,18 +24,120 @@ from cds_rdm.inspire_harvester.logger import hlog from cds_rdm.inspire_harvester.update.config import UPDATE_STRATEGY_CONFIG -from cds_rdm.inspire_harvester.update.engine import ( - UpdateContext, - UpdateEngine, - UpdateEngineConflict, -) +from cds_rdm.inspire_harvester.update.engine import UpdateContext, UpdateEngine class InspireWriter(BaseWriter): """INSPIRE writer.""" + @staticmethod + def _compact_error(value): + """Collapse whitespace in exception details.""" + return " ".join(str(value or "").split()).strip() + + @classmethod + def _flatten_validation_errors(cls, value, prefix=""): + """Flatten nested validation payloads into readable field messages.""" + if isinstance(value, dict): + parts = [] + for key, item in value.items(): + key_text = "" if isinstance(key, int) else str(key) + next_prefix = prefix + if key_text: + next_prefix = f"{prefix}.{key_text}" if prefix else key_text + parts.extend(cls._flatten_validation_errors(item, next_prefix)) + return parts + + if isinstance(value, list): + if value and all(not isinstance(item, (dict, list)) for item in value): + message = ", ".join( + cls._compact_error(item) for item in value if cls._compact_error(item) + ) + if not message: + return [] + return [f"{prefix}: {message}" if prefix else message] + + parts = [] + for item in value: + parts.extend(cls._flatten_validation_errors(item, prefix)) + return parts + + text = cls._compact_error(value) + if not text: + return [] + return [f"{prefix}: {text}" if prefix else text] + + @classmethod + def _format_validation_error(cls, error): + """Return a readable validation error summary.""" + messages = getattr(error, "messages", error) + parts = cls._flatten_validation_errors(messages) + return "; ".join(parts) if parts else cls._compact_error(messages) + + @classmethod + def _describe_exception(cls, error): + """Return a compact exception label for logs and report messages.""" + details = cls._compact_error(error) + if details: + return f"{error.__class__.__name__}: {details}" + return error.__class__.__name__ + + @staticmethod + def _context_labels(inspire_id=None, record_pid=None, draft_id=None, file_key=None): + """Build readable context labels for developer-facing logs.""" + labels = [] + if inspire_id: + labels.append(f"INSPIRE#{inspire_id}") + if record_pid: + labels.append(f"record {record_pid}") + if draft_id: + labels.append(f"draft {draft_id}") + if file_key: + labels.append(f"file '{file_key}'") + return labels + + def _raise_unexpected_operation_error( + self, + *, + subject, + action, + error, + logger=None, + inspire_id=None, + record_pid=None, + draft_id=None, + file_key=None, + ): + """Log traceback-rich context and raise a readable writer error.""" + context = ", ".join( + self._context_labels( + inspire_id=inspire_id, + record_pid=record_pid, + draft_id=draft_id, + file_key=file_key, + ) + ) + log_message = f"Unexpected error while handling {subject}" + if context: + log_message = f"{log_message} ({context})" + + active_logger = logger or current_app.logger + active_logger.exception(log_message) + + raise self._writer_error( + f"The {subject} could not be {action} because " + f"{self._describe_exception(error)}" + ) from error + + @staticmethod + def _writer_error(message): + """Return a compact ``WriterError`` instance.""" + return WriterError(message) + @hlog - def _write_entry(self, stream_entry, *args, inspire_id=None, logger=None, **kwargs): + def _write_entry( + self, stream_entry, *args, inspire_id=None, logger=None, **kwargs + ): """Write entry to CDS.""" existing_records = self._get_existing_records(stream_entry) @@ -47,12 +148,9 @@ def _write_entry(self, stream_entry, *args, inspire_id=None, logger=None, **kwar existing_records_hits = existing_records.to_dict()["hits"]["hits"] existing_records_ids = [hit["id"] for hit in existing_records_hits] if multiple_records_found: - msg = "Multiple records match: {0}".format(", ".join(existing_records_ids)) - logger.error(msg) - stream_entry.errors.append(f"[inspire_id={inspire_id}] {msg}") - + stream_entry.errors.append(msg) return None elif should_update: @@ -84,15 +182,12 @@ def _process_entry( try: op_type = self._write_entry(stream_entry, *args, **kwargs) except WriterError as e: - error_message = f"Error while processing entry : {str(e)}." + error_message = self._compact_error(e) except ValidationError as e: - error_message = f"Validation error while processing entry: {str(e)}." - # except Exception as e: - # raise e - # error_message = f"Unexpected error while processing entry: {str(e)}." + error_message = self._format_validation_error(e) if error_message: - logger.error(error_message) - stream_entry.errors.append(f"[inspire_id={inspire_id}] {error_message}") + logger.error(f"Error while processing entry: {error_message}") + stream_entry.errors.append(error_message) stream_entry.op_type = op_type return stream_entry @@ -172,11 +267,12 @@ def _get_existing_records( }, ) - for filter_key, filter in filters_priority.items(): - - if filter["value"]: - combined_filter = dsl.Q("bool", filter=filter["filter"]) - logger.debug(f"Searching for existing records: {filter['filter']}") + for search_filter in filters_priority.values(): + if search_filter["value"]: + combined_filter = dsl.Q("bool", filter=search_filter["filter"]) + logger.debug( + f"Searching for existing records: {search_filter['filter']}" + ) result = current_rdm_records_service.search( system_identity, extra_filter=combined_filter @@ -219,10 +315,8 @@ def update_record( ) # Normalize the checksum format in existing for comparison - existing_checksums = [ - value["checksum"] for key, value in existing_files.items() - ] - new_checksums = [value["checksum"] for key, value in new_files.items()] + existing_checksums = [value["checksum"] for value in existing_files.values()] + new_checksums = [value["checksum"] for value in new_files.values()] logger.debug(f"Existing files' checksums: {existing_checksums}.") logger.debug(f"New files' checksums: {new_checksums}.") @@ -244,7 +338,7 @@ def update_record( engine = UpdateEngine( strategies=UPDATE_STRATEGY_CONFIG, - fail_on_conflict=True + fail_on_conflict=True, ) ctx = UpdateContext(source="inspire_import") result = engine.update(record_dict, entry, ctx, logger) @@ -279,20 +373,27 @@ def update_record( logger.info(f"Success: Record {record_pid} updated and published.") except ValidationError as e: - logger.error( - f"Failure: draft {record_pid} not published, validation errors: {e}." - ) current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise e + raise self._writer_error( + "The updated draft could not be published because it " + f"failed validation: {self._format_validation_error(e)}" + ) except ValidationErrorWithMessageAsList as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise WriterError( - f"ERROR: Draft {draft['id']} not published, validation errors: {e.messages}." + raise self._writer_error( + "The updated draft could not be published because it " + f"failed validation: {self._format_validation_error(e)}" ) except Exception as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise WriterError( - f"Draft {draft.id} failed publishing because of an unexpected error: {str(e)}." + self._raise_unexpected_operation_error( + subject=f"updated draft {draft.id}", + action="published", + error=e, + logger=logger, + inspire_id=inspire_id, + record_pid=record_pid, + draft_id=draft.id, ) @hlog @@ -306,16 +407,14 @@ def _update_files( logger=None, ): entry = stream_entry.entry - logger.info("Updating files for record {}".format(record.id)) + logger.info(f"Updating files for record {record.id}") record_dict = record.data existing_files = record_dict["files"]["entries"] new_files = entry["files"].get("entries", {}) - existing_checksums = [ - value["checksum"] for key, value in existing_files.items() - ] - new_checksums = [value["checksum"] for key, value in new_files.items()] + existing_checksums = [value["checksum"] for value in existing_files.values()] + new_checksums = [value["checksum"] for value in new_files.values()] files_to_create = list(set(new_checksums) - set(existing_checksums)) files_to_delete = list(set(existing_checksums) - set(new_checksums)) @@ -345,15 +444,22 @@ def _update_files( file_content = self._fetch_file(stream_entry, inspire_url) if not file_content: - logger.error(f"Failed to fetch file content for: {key}") - return + raise self._writer_error( + f"Could not fetch the INSPIRE file '{key}' after repeated download failures." + ) self._create_file(stream_entry, file, file_content, new_draft) logger.info(f"{len(new_files.items())} files successfully created.") @hlog def _create_new_version( - self, stream_entry, update_metadata, record, inspire_id=None, record_pid=None, logger=None + self, + stream_entry, + update_metadata, + record, + inspire_id=None, + record_pid=None, + logger=None, ): """For records with updated files coming from INSPIRE, create and publish a new version.""" entry = stream_entry.entry @@ -405,15 +511,30 @@ def _create_new_version( current_rdm_records_service.delete_draft( system_identity, new_version_draft.id ) - raise WriterError( - f"Failure: Draft {new_version_draft.id} not published, validation errors: {e}." + raise self._writer_error( + "The new version draft could not be published because it " + f"failed validation: {self._format_validation_error(e)}" ) except ValidationErrorWithMessageAsList as e: current_rdm_records_service.delete_draft( system_identity, new_version_draft.id ) - raise WriterError( - f"Failure: draft {new_version_draft.id} not published, validation errors: {e.messages}." + raise self._writer_error( + "The new version draft could not be published because it " + f"failed validation: {self._format_validation_error(e)}" + ) + except Exception as e: + current_rdm_records_service.delete_draft( + system_identity, new_version_draft.id + ) + self._raise_unexpected_operation_error( + subject=f"new version draft {new_version_draft.id}", + action="published", + error=e, + logger=logger, + inspire_id=inspire_id, + record_pid=record_pid, + draft_id=new_version_draft.id, ) @hlog @@ -443,7 +564,9 @@ def _create_new_record( DATACITE_PREFIX = current_app.config["DATACITE_PREFIX"] is_cds = DATACITE_PREFIX in doi.get("identifier", "") if is_cds: - raise WriterError("Trying to create record with CDS DOI") + raise self._writer_error( + "The INSPIRE record points to a CDS DOI, so it cannot be created as a new CDS record." + ) file_entries = entry["files"].get("entries", None) logger.debug(f"Files to create: {len(file_entries) if file_entries else 0}") @@ -451,6 +574,7 @@ def _create_new_record( logger.debug("Creating new record draft") draft = current_rdm_records_service.create(system_identity, data=entry) + current_file_key = None logger.info(f"New draft is created ({draft.id}).") @@ -461,22 +585,46 @@ def _create_new_record( ) for key, file_data in file_entries.items(): + current_file_key = key logger.debug(f"Processing file: {key}") + if not file_data.get("checksum"): + raise self._writer_error( + "The INSPIRE file " + f"'{file_data.get('key', key)}' cannot be imported because " + "INSPIRE did not provide a checksum." + ) + inspire_url = file_data.pop("source_url", None) file_content = self._fetch_file(stream_entry, inspire_url) if not file_content: - logger.error(f"Failed to fetch file content for: {key}") - - return + raise self._writer_error( + f"Could not fetch the INSPIRE file '{key}' after repeated download failures." + ) self._create_file(stream_entry, file_data, file_content, draft) logger.info(f"All the files successfully created.") + except ValidationError as e: + current_rdm_records_service.delete_draft(system_identity, draft["id"]) + raise self._writer_error( + "The new draft could not be created because file validation " + f"failed: {self._format_validation_error(e)}" + ) from e + except WriterError: + current_rdm_records_service.delete_draft(system_identity, draft["id"]) + raise except Exception as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - logger.error(f"Draft {draft.id} is deleted due to errors.") - raise e + self._raise_unexpected_operation_error( + subject=f"file import for new draft {draft.id}", + action="completed", + error=e, + logger=logger, + inspire_id=inspire_id, + draft_id=draft.id, + file_key=current_file_key, + ) else: try: self._add_community(stream_entry, draft) @@ -489,17 +637,26 @@ def _create_new_record( except ValidationError as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise WriterError( - f"Failure: draft {draft['id']} not published, validation errors: {e}." + raise self._writer_error( + "The new draft could not be published because it failed " + f"validation: {self._format_validation_error(e)}" ) except ValidationErrorWithMessageAsList as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise WriterError( - f"Failure: draft {draft['id']} not published, validation errors: {e.messages}." + raise self._writer_error( + "The new draft could not be published because it failed " + f"validation: {self._format_validation_error(e)}" ) except Exception as e: current_rdm_records_service.delete_draft(system_identity, draft["id"]) - raise e + self._raise_unexpected_operation_error( + subject=f"new draft {draft.id}", + action="published", + error=e, + logger=logger, + inspire_id=inspire_id, + draft_id=draft.id, + ) @hlog def _fetch_file( @@ -544,10 +701,7 @@ def _fetch_file( logger.debug("Retrying in 1 minute...") time.sleep(60) - logger.error( - f"Retrieving file request failed. Max retries {max_retries} reached." - f" URL: {inspire_url}." - ) + return None @hlog def _create_file( @@ -567,6 +721,13 @@ def _create_file( file_source = file_data.get("source") new_checksum = None + if not inspire_checksum: + raise self._writer_error( + "The INSPIRE file " + f"'{file_data['key']}' cannot be imported because INSPIRE did " + "not provide a checksum." + ) + try: service.draft_files.init_files( system_identity, @@ -599,14 +760,9 @@ def _create_file( assert inspire_checksum == new_checksum elif inspire_checksum and file_source == "arxiv": assert inspire_checksum == new_checksum - except AssertionError as e: - ## TODO draft? delete record completely? - logger.error( - f"Files checksums don't match. Delete file: '{file_data['key']}' from draft." - ) - + except AssertionError: service.draft_files.delete_file(system_identity, draft.id, file_data["key"]) - - raise WriterError( - f"File {file_data['key']} checksum mismatch. Expected: {inspire_checksum}, got: {new_checksum}." - ) \ No newline at end of file + raise self._writer_error( + "File checksum mismatch for " + f"'{file_data['key']}'. Expected {inspire_checksum}, got {new_checksum}." + ) diff --git a/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html b/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html index 761ba03a..f42747a7 100644 --- a/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html +++ b/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html @@ -4,9 +4,62 @@ CDS-RDM is free software; you can redistribute it and/or modify it under the terms of the GPL-2.0 License; see LICENSE file for more details. -Job run report body: layout aligned with invenio_jobs RunsLogs. Presentation-only -logic lives here; resource.py only supplies run, lines, and counts. +Job run report body: layout aligned with invenio_jobs RunsLogs. Grouped issues +use collapsible sections; expanded content shows all affected records and +per-record log lines in the same format as the stock job log viewer. #} +{% macro render_issue_group(issue, level_name, label_color) %} +
+ + {{ level_name }} + {{ issue.title }} + + {{ _("%(count)s records", count=issue.count) }} + + {% if issue.examples %} + + {{ _("Examples") }}: + {% for inspire_id in issue.examples %} + + INSPIRE#{{ inspire_id }} + {% if not loop.last %}, {% endif %} + {% endfor %} + + {% endif %} + +
+ {% if issue.records %} +
+ {{ _("Records") }} ({{ issue.records | length }}): + +
+ {% endif %} +
+ {% for entry in issue.entries %} +
+ [{{ entry.timestamp }}] + {{ entry.level }} + {{ entry.message }} +
+ {% endfor %} +
+
+
+{% endmacro %} +
{% if truncation_message %}
@@ -66,26 +119,69 @@

{{ _("Job run") }}

{% if status == "FAILED" %}{{ _("Job failed") }}{% else %}{{ _("Job partially succeeded") }}{% endif %}
- {% if run.message %} + {% if run_stats %} +

{{ run_stats }}

+ {% elif failure_summary_lines %} +
{{ failure_summary_lines | join("\n") }}
+ {% elif run.message %}
{{ run.message }}
{% endif %} - {% if error_count %} -

{{ _("%(count)s error(s) found in logs below", count=error_count) }}

+ {% if error_count or warning_count %} +

+ {% if error_count %}{{ _("%(count)s error(s)", count=error_count) }}{% endif %} + {% if error_count and warning_count %} · {% endif %} + {% if warning_count %}{{ _("%(count)s warning(s)", count=warning_count) }}{% endif %} + {{ _("in grouped log view below") }} +

{% endif %}
{% endif %} -
- {% for line in lines %} -
- {{ line }} -
+ {% if grouped_errors or grouped_warnings %} + {% if grouped_errors %} +

+ + {{ _("Top failure reasons") }} +

+
+ {% for issue in grouped_errors %} + {{ render_issue_group(issue, "ERROR", "red") }} {% endfor %} - {% if not lines %} -

{{ _("No log lines in this view.") }}

- {% endif %}
+ {% endif %} + + {% if grouped_warnings %} +

+ + {{ _("Warnings") }} +

+
+ {% for issue in grouped_warnings %} + {{ render_issue_group(issue, "WARNING", "orange") }} + {% endfor %} +
+ {% endif %} + {% else %} +
+

{{ _("No grouped issues found for this run.") }}

+
+ {% endif %} + + {% if other_lines and (grouped_errors or grouped_warnings) %} +
+ {{ _("Other log lines") }} ({{ other_lines | length }}) +
+ {% for item in other_lines %} +
+ [{{ item.timestamp }}] + {{ item.level }} + {{ item.message }} +
+ {% endfor %} +
+
+ {% endif %}