diff --git a/site/cds_rdm/ext.py b/site/cds_rdm/ext.py
index 17d9dcd7..668a5eb4 100644
--- a/site/cds_rdm/ext.py
+++ b/site/cds_rdm/ext.py
@@ -6,6 +6,14 @@
# the terms of the GPL-2.0 License; see LICENSE file for more details.
"""CDS-RDM module."""
+from datetime import datetime
+
+from flask import current_app
+from invenio_jobs.logging.jobs import ContextAwareOSHandler, job_context
+from invenio_jobs.services import JobLogEntrySchema
+from invenio_search import current_search_client
+from invenio_search.utils import prefix_index
+
from cds_rdm.clc_sync.resources.config import CLCSyncResourceConfig
from cds_rdm.clc_sync.resources.resource import CLCSyncResource
from cds_rdm.clc_sync.resources.utils import get_clc_sync_entry
@@ -21,6 +29,44 @@
from .views import get_linked_records_search_query
+class CDSContextAwareOSHandler(ContextAwareOSHandler):
+ """Job log handler that preserves selected structured fields."""
+
+ _extra_fields = (
+ "entry_id",
+ "inspire_id",
+ "record_pid",
+ "report_group_key",
+ "report_kind",
+ "report_reason",
+ "skip_sentry",
+ )
+
+ def enrich_log(self, record):
+ """Enrich log record with context and supported extra fields."""
+ context = dict(job_context.get())
+ log_data = {
+ "timestamp": datetime.now().isoformat(),
+ "level": record.levelname,
+ "message": record.getMessage(),
+ "module": record.module,
+ "function": record.funcName,
+ "line": record.lineno,
+ "context": context,
+ }
+ serialized_data = JobLogEntrySchema().load(log_data)
+ for field_name in self._extra_fields:
+ value = getattr(record, field_name, None)
+ if value is not None:
+ serialized_data[field_name] = value
+ return serialized_data
+
+ def index_in_os(self, log_data):
+ """Send log data to OpenSearch."""
+ full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"])
+ current_search_client.index(index=full_index_name, body=log_data)
+
+
class CDS_RDM_App(object):
"""CDS-RDM App."""
@@ -39,6 +85,7 @@ def init_app(self, app):
"""Flask application initialization."""
self.init_services(app)
self.init_resources(app)
+ self.init_job_logging(app)
app.jinja_env.globals["get_clc_sync_entry"] = get_clc_sync_entry
app.jinja_env.globals["evaluate_permissions"] = evaluate_permissions
# Register filter for building linked records search query
@@ -61,6 +108,18 @@ def init_resources(self, app):
config=HarvesterDownloadResourceConfig,
)
+ def init_job_logging(self, app):
+ """Replace the default jobs log handler with the CDS-aware variant."""
+ handlers = list(app.logger.handlers)
+ for handler in handlers:
+ if type(handler) is ContextAwareOSHandler:
+ app.logger.removeHandler(handler)
+
+ if app.config.get("JOBS_LOGGING"):
+ os_handler = CDSContextAwareOSHandler()
+ os_handler.setLevel(app.config["JOBS_LOGGING_LEVEL"])
+ app.logger.addHandler(os_handler)
+
class CDS_RDM_UI(object):
"""CDS-RDM extension."""
diff --git a/site/cds_rdm/harvester_download/resources/resource.py b/site/cds_rdm/harvester_download/resources/resource.py
index f9b37c3b..22258a93 100644
--- a/site/cds_rdm/harvester_download/resources/resource.py
+++ b/site/cds_rdm/harvester_download/resources/resource.py
@@ -16,7 +16,7 @@
from cds_rdm.harvester_runs.logs import (
HarvesterRunError,
fetch_harvester_run_logs,
- lines_from_hits,
+ group_log_hits,
plain_text_log,
resolve_harvester_run,
)
@@ -48,8 +48,15 @@ def download(self):
raise self._http_json_error(error.message, error.code)
hits, total = fetch_harvester_run_logs(run)
- lines, error_count, warning_count = lines_from_hits(hits)
- logs = plain_text_log(run, lines, total, error_count, warning_count)
+ grouped_issues, other_lines, error_count, warning_count = group_log_hits(hits)
+ logs = plain_text_log(
+ run,
+ grouped_issues,
+ other_lines,
+ total,
+ error_count,
+ warning_count,
+ )
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"harvester_logs_{run.id}_{timestamp}.log"
diff --git a/site/cds_rdm/harvester_runs/logs.py b/site/cds_rdm/harvester_runs/logs.py
index a49e03a9..93ddec09 100644
--- a/site/cds_rdm/harvester_runs/logs.py
+++ b/site/cds_rdm/harvester_runs/logs.py
@@ -7,18 +7,26 @@
"""Helpers for INSPIRE harvester run logs."""
-import re
+import ast
import uuid
+from collections import OrderedDict
from datetime import datetime
from flask import current_app
from flask_babel import format_datetime
-from invenio_access.permissions import system_identity
from invenio_i18n import gettext as _
from invenio_jobs.models import Run
-from invenio_jobs.proxies import current_jobs_logs_service
+from invenio_search import current_search_client
+from invenio_search.utils import prefix_index
INSPIRE_HARVESTER_TASK = "process_inspire"
+INSPIRE_LITERATURE_URL = "https://inspirehep.net/literature/"
+BRACKET_INSPIRE_PREFIX = "[INSPIRE#"
+BRACKET_ENTRY_PREFIX = "[entry_id="
+SKIPPED_ENTRY_PREFIX = "Skipped entry "
+SKIPPED_ENTRY_DELIMITER = " with errors: "
+VOCABULARY_WARNING_PREFIX = "Vocabulary term '"
+VOCABULARY_WARNING_DELIMITER = "' not found in '"
class HarvesterRunError(Exception):
@@ -33,18 +41,525 @@ def __init__(self, message, code):
def format_timestamp(value):
"""Format timestamps for display."""
- if value is None or value == "":
+ if value in (None, ""):
return "N/A"
if isinstance(value, datetime):
dt = value
else:
try:
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
- except (ValueError, TypeError):
+ except (TypeError, ValueError):
return str(value)
return format_datetime(dt, "yyyy-MM-dd HH:mm")
+# String helpers -------------------------------------------------------------
+
+def _compact(value):
+ """Collapse whitespace while keeping the original text readable."""
+ text = str(value or "").replace("\\n", " ").replace("\n", " ")
+ return " ".join(text.split()).strip()
+
+
+def _remove_prefix(text, prefix):
+ """Remove a prefix when present."""
+ return text[len(prefix) :] if text.startswith(prefix) else text
+
+
+def _extract_bracket_value(text, prefix):
+ """Extract ``[prefixVALUE]`` blocks from a message."""
+ text = str(text or "")
+ start = text.find(prefix)
+ if start == -1:
+ return None
+ value_start = start + len(prefix)
+ value_end = text.find("]", value_start)
+ if value_end == -1:
+ return None
+ value = text[value_start:value_end].strip()
+ return value or None
+
+
+def _extract_identifier_after(text, marker):
+ """Extract an identifier that immediately follows a marker."""
+ text = str(text or "")
+ start = text.find(marker)
+ if start == -1:
+ return None
+ cursor = start + len(marker)
+ while cursor < len(text) and text[cursor].isspace():
+ cursor += 1
+
+ end = cursor
+ while end < len(text) and (
+ text[end].isalnum() or text[end] in {"_", "-"}
+ ):
+ end += 1
+
+ value = text[cursor:end].strip()
+ return value or None
+
+
+def _extract_entry_id_from_dict(text):
+ """Extract ``id`` from a stringified entry dictionary when possible."""
+ text = str(text or "").strip()
+ if not text.startswith("{"):
+ return None
+ try:
+ parsed = ast.literal_eval(text)
+ except (ValueError, SyntaxError):
+ return None
+ if isinstance(parsed, dict):
+ value = parsed.get("id")
+ return str(value) if value not in (None, "") else None
+ return None
+
+
+def _strip_leading_bracket_prefixes(message):
+ """Drop leading ``[prefix]`` blocks from a log line."""
+ text = _compact(message)
+ while text.startswith("["):
+ end = text.find("]")
+ if end == -1:
+ break
+ text = text[end + 1 :].lstrip()
+ return text
+
+
+def _remove_bracket_block(text, prefix):
+ """Remove ``[prefixVALUE]`` blocks wherever they appear."""
+ text = str(text or "")
+ while True:
+ start = text.find(prefix)
+ if start == -1:
+ return text
+ end = text.find("]", start + len(prefix))
+ if end == -1:
+ return text
+ text = f"{text[:start]} {text[end + 1:]}"
+
+
+def _strip_inline_inspire_ids(text):
+ """Remove inline ``INSPIRE#123`` and ``INSPIRE:123`` references."""
+ result = []
+ for token in str(text or "").split():
+ cleaned = token.strip(".,;:()[]")
+ if cleaned.startswith(("INSPIRE#", "INSPIRE:")):
+ continue
+ result.append(token)
+ return " ".join(result)
+
+
+def _is_skip_summary_message(message):
+ """Return True for ``Skipping N transformed entries with errors`` lines."""
+ text = _compact(message).rstrip(".").lower()
+ return text.startswith("skipping ") and text.endswith(
+ " transformed entries with errors"
+ )
+
+
+def _parse_vocabulary_warning(reason):
+ """Extract term and vocabulary name from a vocabulary warning."""
+ reason = _compact(reason)
+ if not reason.startswith(VOCABULARY_WARNING_PREFIX):
+ return None
+
+ tail = reason[len(VOCABULARY_WARNING_PREFIX) :]
+ term, separator, remainder = tail.partition(VOCABULARY_WARNING_DELIMITER)
+ if not separator:
+ return None
+
+ vocab_type = remainder.rstrip(".")
+ if vocab_type.endswith("'"):
+ vocab_type = vocab_type[:-1]
+ if not term or not vocab_type:
+ return None
+ return term, vocab_type
+
+
+def _extract_draft_id(message):
+ """Extract the draft id from a draft-related log line."""
+ text = _message_without_record_prefix(message)
+ if not text.startswith("Draft "):
+ return None
+ parts = text.split()
+ return parts[1].rstrip(".") if len(parts) > 1 else None
+
+
+def _leading_inspire_prefix(message):
+ """Return the leading ``[INSPIRE#...]`` block when present."""
+ prefix = _extract_bracket_value(message, BRACKET_INSPIRE_PREFIX)
+ return f"{BRACKET_INSPIRE_PREFIX}{prefix}]" if prefix else ""
+
+
+# Structured log helpers -----------------------------------------------------
+
+def _source(hit):
+ """Return the normalized log payload."""
+ return hit.get("_source") or hit
+
+
+def _nested_value(data, *path):
+ """Read a nested value from a dictionary."""
+ value = data
+ for key in path:
+ if not isinstance(value, dict):
+ return None
+ value = value.get(key)
+ if value is None:
+ return None
+ return value
+
+
+def _first_value(data, *paths):
+ """Return the first non-empty value from the provided paths."""
+ for path in paths:
+ if isinstance(path, str):
+ value = data.get(path)
+ else:
+ value = _nested_value(data, *path)
+ if value not in (None, ""):
+ return value
+ return None
+
+
+def _run_stats(run):
+ """Build a compact summary of record counters for the run."""
+ labels = (
+ ("inserted_entries_count", _("inserted")),
+ ("updated_entries_count", _("updated")),
+ ("errored_entries_count", _("errored")),
+ )
+ parts = []
+ for field_name, label in labels:
+ value = getattr(run, field_name, None)
+ if value:
+ parts.append(f"{value} {label}")
+ return " · ".join(parts)
+
+
+# Log parsing helpers --------------------------------------------------------
+
+def _extract_inspire_id(message):
+ """Extract INSPIRE id from a log message when present."""
+ return (
+ _extract_bracket_value(message, BRACKET_INSPIRE_PREFIX)
+ or _extract_identifier_after(message, "INSPIRE#")
+ or _extract_identifier_after(message, "INSPIRE:")
+ )
+
+
+def _extract_entry_id(message):
+ """Extract entry id from a log message when present."""
+ return _extract_bracket_value(
+ message, BRACKET_ENTRY_PREFIX
+ ) or _extract_entry_id_from_dict(message)
+
+
+def _message_without_record_prefix(message):
+ """Strip INSPIRE/CDS prefixes before matching log patterns."""
+ return _strip_leading_bracket_prefixes(message).strip()
+
+
+def _is_skip_summary_log(message):
+ """Return True for legacy batch-summary skip messages."""
+ return _is_skip_summary_message(message)
+
+
+def _is_generic_draft_deleted(message):
+ """Return True for legacy draft-rollback lines without error details."""
+ text = _message_without_record_prefix(message).rstrip(".").lower()
+ return (
+ text.startswith("draft ")
+ and (
+ " deleted due to errors" in text
+ or " is deleted due to errors" in text
+ or " deleted after create failed" in text
+ )
+ )
+
+
+def _flatten_error_payload(value, prefix=""):
+ """Flatten nested error payloads into readable field messages."""
+ if isinstance(value, dict):
+ parts = []
+ for key, item in value.items():
+ key_text = "" if isinstance(key, int) else str(key)
+ next_prefix = prefix
+ if key_text:
+ next_prefix = f"{prefix}.{key_text}" if prefix else key_text
+ parts.extend(_flatten_error_payload(item, next_prefix))
+ return parts
+
+ if isinstance(value, list):
+ if value and all(not isinstance(item, (dict, list)) for item in value):
+ message = ", ".join(_compact(item) for item in value if _compact(item))
+ if not message:
+ return []
+ return [f"{prefix}: {message}" if prefix else message]
+
+ parts = []
+ for item in value:
+ parts.extend(_flatten_error_payload(item, prefix))
+ return parts
+
+ text = _compact(value)
+ if not text:
+ return []
+ return [f"{prefix}: {text}" if prefix else text]
+
+
+# Error payload decoding -----------------------------------------------------
+
+def _decode_error_repr(value):
+ """Decode Python-style list/dict error payloads when present."""
+ text = str(value or "").strip()
+ if not (text.startswith("[") or text.startswith("{")):
+ return text
+ try:
+ parsed = ast.literal_eval(text)
+ except (ValueError, SyntaxError):
+ return text
+
+ parts = _flatten_error_payload(parsed)
+ return "; ".join(parts) if parts else text
+
+
+def _parse_skipped_entry_message(message):
+ """Extract the entry id and inner error from a legacy skipped-entry message."""
+ compact_message = _compact(message)
+ if not compact_message.startswith(SKIPPED_ENTRY_PREFIX):
+ return None, None
+ entry_payload, separator, errors_payload = compact_message[
+ len(SKIPPED_ENTRY_PREFIX) :
+ ].partition(SKIPPED_ENTRY_DELIMITER)
+ if not separator:
+ return None, None
+
+ entry_id = _extract_entry_id(entry_payload)
+ errors = _decode_error_repr(errors_payload)
+ return entry_id, errors
+
+
+def _humanize_reason(reason):
+ """Turn internal log text into a readable group title."""
+ reason = _decode_error_repr(_compact(reason))
+ reason = _remove_bracket_block(reason, BRACKET_ENTRY_PREFIX)
+ reason = _remove_bracket_block(reason, BRACKET_INSPIRE_PREFIX)
+ reason = _strip_inline_inspire_ids(reason)
+ reason = reason.replace("failed transformation. Errors:", "")
+ reason = reason.replace("Errors:", "")
+ reason = reason.replace("failed transformation.", "")
+ reason = _compact(reason)
+
+ if _is_skip_summary_message(reason):
+ return "Entries were skipped because they already had errors."
+ if reason.startswith("Multiple records match:"):
+ return "Multiple CDS records match this INSPIRE record."
+ if reason == "Thesis publication date missing (thesis_info and imprint).":
+ return (
+ "The INSPIRE thesis record is missing a publication date in both "
+ "thesis_info and imprint."
+ )
+ if reason.startswith("checksum: Field may not be null."):
+ return "File upload failed because the checksum is missing."
+
+ vocabulary_match = _parse_vocabulary_warning(reason)
+ if vocabulary_match:
+ term, vocab_type = vocabulary_match
+ return (
+ f"The vocabulary value '{term}' is missing from the CDS "
+ f"'{vocab_type}' vocabulary."
+ )
+
+ if " not published because of validation errors: " in reason:
+ subject, details = reason.split(
+ " not published because of validation errors: ",
+ 1,
+ )
+ subject = subject[:1].upper() + subject[1:]
+ return (
+ f"{subject} could not be published because it failed "
+ f"validation: {details}"
+ )
+
+ return reason
+
+
+def _is_skipped_entry_log(message, report_kind):
+ """Return True when the log line represents a skipped entry/error."""
+ if report_kind == "skipped_entry":
+ return True
+ compact_message = _compact(message)
+ return compact_message.startswith(SKIPPED_ENTRY_PREFIX) or _is_skip_summary_message(
+ compact_message
+ )
+
+
+# Grouping helpers -----------------------------------------------------------
+
+def _collect_detailed_errors_by_record(hits):
+ """Map record ids to the most useful error line logged for that record."""
+ details = {}
+ for hit in hits:
+ source = _source(hit)
+ message = _compact(source.get("message"))
+ if _is_generic_draft_deleted(message) or _is_skip_summary_log(message):
+ continue
+ level = str(source.get("level") or "").upper()
+ if level not in {"ERROR", "WARNING"}:
+ continue
+ record_id = _extract_inspire_id(message) or _extract_entry_id(message)
+ if not record_id:
+ continue
+ detail = _message_without_record_prefix(message)
+ if not any(
+ marker in detail
+ for marker in (
+ "Validation error while processing entry",
+ "Error while processing entry",
+ "deleted after",
+ "not published",
+ "validation errors",
+ "checksum:",
+ "Field may not be null",
+ "does not have a checksum",
+ )
+ ):
+ continue
+ details[str(record_id)] = detail
+ return details
+
+
+def _enrich_draft_deleted_message(message, details_by_record):
+ """Attach the underlying failure to legacy draft-deletion log lines."""
+ if not _is_generic_draft_deleted(message):
+ return message
+
+ record_id = _extract_inspire_id(message) or _extract_entry_id(message)
+ draft_id = _extract_draft_id(message) or "unknown"
+ detail = details_by_record.get(str(record_id)) if record_id else None
+ if not detail:
+ return message
+
+ prefix = _leading_inspire_prefix(message)
+ if prefix:
+ prefix = f"{prefix} "
+ return f"{prefix}Draft {draft_id} deleted: {detail}"
+
+
+def _is_non_actionable_log(message):
+ """Return True for side-effect log lines that should not become failure groups."""
+ compact_message = _message_without_record_prefix(message)
+ return _is_skip_summary_log(compact_message)
+
+
+def _message_reason(message):
+ """Build a grouping key from the visible log message."""
+ reason = _compact(message)
+ _entry_id, skipped_reason = _parse_skipped_entry_message(reason)
+ if skipped_reason:
+ return skipped_reason
+ reason = _message_without_record_prefix(reason)
+ if reason.startswith("Draft ") and " deleted: " in reason:
+ reason = reason.split(" deleted: ", 1)[1]
+ reason = _remove_prefix(reason, "Error while processing entry:")
+ reason = _remove_prefix(reason.lstrip(), "Validation error while processing entry:")
+ reason = _remove_prefix(reason.lstrip(), SKIPPED_ENTRY_PREFIX)
+ reason = _remove_bracket_block(reason, BRACKET_ENTRY_PREFIX)
+ if SKIPPED_ENTRY_DELIMITER in reason:
+ reason = reason.split(SKIPPED_ENTRY_DELIMITER, 1)[1]
+ return _compact(reason)
+
+
+def _normalize_log_hit(hit, details_by_record=None):
+ """Convert a raw log hit into the fields used by the report view."""
+ source = _source(hit)
+ message = _compact(source.get("message"))
+ if details_by_record:
+ message = _enrich_draft_deleted_message(message, details_by_record)
+ reason = _compact(
+ _first_value(source, "report_reason", ("extra", "report_reason"))
+ )
+ group_key = _compact(
+ _first_value(
+ source,
+ "report_group_key",
+ ("extra", "report_group_key"),
+ )
+ or reason
+ )
+ level = str(source.get("level") or "INFO").upper()
+ report_kind = _first_value(source, "report_kind", ("extra", "report_kind"))
+ effective_level = (
+ "ERROR" if _is_skipped_entry_log(message, report_kind) else level
+ )
+ if effective_level == "CRITICAL":
+ effective_level = "ERROR"
+
+ inspire_id = _first_value(
+ source,
+ "inspire_id",
+ ("extra", "inspire_id"),
+ ("context", "inspire_id"),
+ )
+ if inspire_id is None:
+ inspire_id = _extract_inspire_id(message)
+ entry_id = _first_value(
+ source,
+ "entry_id",
+ ("extra", "entry_id"),
+ ("entry", "id"),
+ )
+ if entry_id is None:
+ entry_id = _extract_entry_id(message)
+ skipped_entry_id, skipped_reason = _parse_skipped_entry_message(message)
+ if entry_id is None and skipped_entry_id is not None:
+ entry_id = skipped_entry_id
+ if inspire_id is None and entry_id is not None:
+ inspire_id = entry_id
+
+ fallback_reason = _message_reason(message)
+ raw_reason = reason or fallback_reason
+ raw_group_key = group_key or raw_reason
+ stable_group_key = _humanize_reason(group_key or fallback_reason)
+ if _is_skipped_entry_log(message, report_kind):
+ raw_group_key = raw_reason
+ stable_group_key = _humanize_reason(raw_reason)
+
+ reason = _humanize_reason(raw_reason)
+ group_key = stable_group_key
+
+ raw_message = message or _("No log message")
+ return {
+ "timestamp": format_timestamp(
+ _first_value(source, "timestamp", "@timestamp")
+ ),
+ "level": effective_level,
+ "message": raw_message,
+ "display_message": reason or raw_message,
+ "report_kind": report_kind,
+ "raw_report_reason": raw_reason,
+ "raw_report_group_key": raw_group_key,
+ "report_reason": reason,
+ "report_group_key": group_key,
+ "inspire_id": str(inspire_id) if inspire_id is not None else None,
+ "entry_id": str(entry_id) if entry_id is not None else None,
+ "record_id": (
+ str(inspire_id)
+ if inspire_id is not None
+ else str(entry_id) if entry_id is not None else None
+ ),
+ }
+
+
+def _format_plain_text_line(entry):
+ """Render a single log line for the download view."""
+ return f"[{entry['timestamp']}] {entry['level']} {entry['display_message']}".rstrip()
+
+
+# Run resolution and fetching ------------------------------------------------
+
def resolve_harvester_run(run_id):
"""Return a top-level INSPIRE harvester run or raise ``HarvesterRunError``."""
run_id = (run_id or "").strip()
@@ -64,17 +579,32 @@ def resolve_harvester_run(run_id):
def fetch_harvester_run_logs(run):
- """Return ``(hits, total)`` from structured job logs."""
+ """Return ``(hits, total)`` for a harvester run."""
try:
- result = current_jobs_logs_service.search(
- system_identity,
- params={
- "q": f'"{run.id}"',
- "sort": "timestamp",
+ full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"])
+ max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000)
+ search_query = {
+ "query": {
+ "bool": {
+ "filter": [
+ {"term": {"context.run_id": str(run.id)}},
+ {"term": {"context.job_id": str(run.job_id)}},
+ ]
+ }
},
+ "sort": [
+ {"@timestamp": {"order": "asc"}},
+ {"_id": {"order": "asc"}},
+ ],
+ "size": max_results,
+ "track_total_hits": True,
+ }
+ response = current_search_client.search(
+ index=full_index_name,
+ body=search_query,
)
- hits = list(result.hits)
- total = result.total or len(hits)
+ hits = response.get("hits", {}).get("hits", [])
+ total = response.get("hits", {}).get("total", {}).get("value", len(hits))
except Exception:
current_app.logger.exception(
"Failed to fetch structured job logs for harvester run %s", run.id
@@ -84,33 +614,97 @@ def fetch_harvester_run_logs(run):
return hits, total
-def lines_from_hits(hits):
- """Return de-duplicated log lines and severity counts."""
- task_groups = {}
+def group_log_hits(hits, max_examples=5):
+ """Group structured warning/error log hits by their report key."""
+ details_by_record = _collect_detailed_errors_by_record(hits)
+ records_with_draft_deleted = set()
+ for hit in hits:
+ message = _compact(_source(hit).get("message"))
+ if _is_generic_draft_deleted(message):
+ record_id = _extract_inspire_id(message) or _extract_entry_id(message)
+ if record_id:
+ records_with_draft_deleted.add(str(record_id))
+
seen = set()
- error_count = 0
- warning_count = 0
+ buckets = OrderedDict()
+ other_lines = []
+
for hit in hits:
- raw_ts = hit.get("timestamp")
- level = hit.get("level", "INFO")
- message = re.sub(r"\s+", " ", (hit.get("message") or "")).strip()
- key = (raw_ts, level, message)
- if key in seen:
+ raw_message = _compact(_source(hit).get("message"))
+ entry = _normalize_log_hit(hit, details_by_record)
+ if _is_non_actionable_log(entry["message"]):
continue
- seen.add(key)
- if level == "ERROR":
- error_count += 1
- elif level == "WARNING":
- warning_count += 1
- task_id = (hit.get("context") or {}).get("task_id") or "unknown"
- task_groups.setdefault(task_id, []).append(
- f"[{format_timestamp(raw_ts)}] {level} {message}"
+
+ record_id = str(entry.get("record_id") or "")
+ if (
+ record_id in records_with_draft_deleted
+ and not _is_generic_draft_deleted(raw_message)
+ and record_id in details_by_record
+ and _message_without_record_prefix(raw_message).rstrip(".")
+ == details_by_record[record_id].rstrip(".")
+ ):
+ continue
+
+ dedupe_key = (
+ entry["timestamp"],
+ entry["level"],
+ entry["message"],
+ entry["record_id"],
)
- lines = [line for group in task_groups.values() for line in group]
- return lines, error_count, warning_count
+ if dedupe_key in seen:
+ continue
+ seen.add(dedupe_key)
+
+ if entry["level"] in {"ERROR", "WARNING"} and entry["report_group_key"]:
+ bucket_key = (entry["level"], entry["report_group_key"])
+ bucket = buckets.setdefault(
+ bucket_key,
+ {
+ "level": entry["level"],
+ "title": (
+ entry["raw_report_reason"] or entry["raw_report_group_key"]
+ ),
+ "entries": [],
+ "records": [],
+ "examples": [],
+ "first_timestamp": entry["timestamp"],
+ },
+ )
+ bucket["entries"].append(entry)
+ if entry["record_id"] and entry["record_id"] not in bucket["records"]:
+ bucket["records"].append(entry["record_id"])
+ if entry["inspire_id"] and entry["inspire_id"] not in bucket["examples"]:
+ bucket["examples"].append(entry["inspire_id"])
+ continue
+
+ other_lines.append(entry)
+ grouped_issues = []
+ for bucket in buckets.values():
+ grouped_issues.append(
+ {
+ "level": bucket["level"],
+ "title": bucket["title"],
+ "count": len(bucket["records"]) or len(bucket["entries"]),
+ "records": bucket["records"],
+ "examples": bucket["examples"][:max_examples],
+ "entries": bucket["entries"],
+ "first_timestamp": bucket["first_timestamp"],
+ }
+ )
-def plain_text_log(run, lines, total, error_count, warning_count):
+ grouped_issues.sort(
+ key=lambda issue: (-issue["count"], issue["first_timestamp"], issue["title"])
+ )
+
+ error_count = sum(1 for issue in grouped_issues if issue["level"] == "ERROR")
+ warning_count = sum(1 for issue in grouped_issues if issue["level"] == "WARNING")
+ return grouped_issues, other_lines, error_count, warning_count
+
+
+# Report rendering -----------------------------------------------------------
+
+def plain_text_log(run, grouped_issues, other_lines, total, error_count, warning_count):
"""Build the plain-text log file content."""
max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000)
status = getattr(run.status, "name", str(run.status))
@@ -121,6 +715,10 @@ def plain_text_log(run, lines, total, error_count, warning_count):
if run.finished_at:
header.append(f"Finished: {format_timestamp(run.finished_at)}")
+ run_stats = _run_stats(run)
+ if run_stats:
+ header.append(f"Records: {run_stats}")
+
summary = []
if status in ("FAILED", "PARTIAL_SUCCESS", "SUCCESS"):
summary.append(
@@ -133,40 +731,59 @@ def plain_text_log(run, lines, total, error_count, warning_count):
if run.message:
summary.append(run.message)
if error_count:
- summary.append(_("%(count)s error(s) found in logs below", count=error_count))
+ summary.append(_("%(count)s grouped error(s) found below", count=error_count))
if warning_count:
summary.append(
- _("%(count)s warning(s) found in logs below", count=warning_count)
+ _("%(count)s grouped warning(s) found below", count=warning_count)
)
if summary:
header.append("")
header.extend(summary)
- if total and total > len(lines):
+ rendered_lines = sum(len(issue["entries"]) for issue in grouped_issues) + len(
+ other_lines
+ )
+ if total and total > rendered_lines:
header.append(
- f"Showing first {len(lines)} of {total} log entries "
+ f"Showing first {rendered_lines} of {total} log entries "
f"(truncated at JOBS_LOGS_MAX_RESULTS={max_results})."
)
header.append("=" * 80)
- logs = "\n".join(header + lines)
- if not lines:
- logs += "\n" + (run.message or "No logs available for this run.\n")
- return logs
+ body = []
+ for issue in grouped_issues:
+ body.append(f"{issue['level']}: {issue['title']}")
+ if issue["records"]:
+ body.append("Affected records: " + ", ".join(issue["records"]))
+ body.extend(_format_plain_text_line(entry) for entry in issue["entries"])
+ body.append("")
+
+ if other_lines:
+ body.append("Other log lines")
+ body.append("-" * 80)
+ body.extend(_format_plain_text_line(entry) for entry in other_lines)
+
+ logs = "\n".join(header + body).rstrip()
+ if not body:
+ logs += "\n" + (run.message or "No logs available for this run.")
+ return logs + "\n"
def report_context(run_id):
"""Build context for the colored HTML report page."""
run = resolve_harvester_run(run_id)
hits, total = fetch_harvester_run_logs(run)
- lines, error_count, _unused_warnings = lines_from_hits(hits)
+ grouped_issues, other_lines, error_count, warning_count = group_log_hits(hits)
status = getattr(run.status, "name", str(run.status))
+ rendered_lines = sum(len(issue["entries"]) for issue in grouped_issues) + len(
+ other_lines
+ )
truncation_message = None
- if total and total > len(lines):
+ if total and total > rendered_lines:
truncation_message = (
f"Log results truncated. Too many log results returned ({total}). "
- f"Only the most recent {len(lines)} results are shown."
+ f"Only the most recent {rendered_lines} results are shown."
)
display_title = (getattr(run, "title", None) or "").strip() or f"Run {run.id}"
@@ -177,6 +794,18 @@ def report_context(run_id):
"started_at": format_timestamp(run.started_at),
"finished_at": format_timestamp(run.finished_at) if run.finished_at else None,
"truncation_message": truncation_message,
- "lines": lines,
+ "grouped_errors": [
+ issue for issue in grouped_issues if issue["level"] == "ERROR"
+ ],
+ "grouped_warnings": [
+ issue for issue in grouped_issues if issue["level"] == "WARNING"
+ ],
+ "other_lines": other_lines,
"error_count": error_count,
+ "warning_count": warning_count,
+ "run_stats": _run_stats(run),
+ "failure_summary_lines": [
+ line.strip() for line in (run.message or "").splitlines() if line.strip()
+ ],
+ "inspire_literature_url": INSPIRE_LITERATURE_URL,
}
diff --git a/site/cds_rdm/inspire_harvester/writer.py b/site/cds_rdm/inspire_harvester/writer.py
index 3917c118..e60297bc 100644
--- a/site/cds_rdm/inspire_harvester/writer.py
+++ b/site/cds_rdm/inspire_harvester/writer.py
@@ -6,7 +6,6 @@
# the terms of the MIT License; see LICENSE file for more details.
"""Writer module."""
-import logging
import time
from collections import OrderedDict
from copy import deepcopy
@@ -25,18 +24,120 @@
from cds_rdm.inspire_harvester.logger import hlog
from cds_rdm.inspire_harvester.update.config import UPDATE_STRATEGY_CONFIG
-from cds_rdm.inspire_harvester.update.engine import (
- UpdateContext,
- UpdateEngine,
- UpdateEngineConflict,
-)
+from cds_rdm.inspire_harvester.update.engine import UpdateContext, UpdateEngine
class InspireWriter(BaseWriter):
"""INSPIRE writer."""
+ @staticmethod
+ def _compact_error(value):
+ """Collapse whitespace in exception details."""
+ return " ".join(str(value or "").split()).strip()
+
+ @classmethod
+ def _flatten_validation_errors(cls, value, prefix=""):
+ """Flatten nested validation payloads into readable field messages."""
+ if isinstance(value, dict):
+ parts = []
+ for key, item in value.items():
+ key_text = "" if isinstance(key, int) else str(key)
+ next_prefix = prefix
+ if key_text:
+ next_prefix = f"{prefix}.{key_text}" if prefix else key_text
+ parts.extend(cls._flatten_validation_errors(item, next_prefix))
+ return parts
+
+ if isinstance(value, list):
+ if value and all(not isinstance(item, (dict, list)) for item in value):
+ message = ", ".join(
+ cls._compact_error(item) for item in value if cls._compact_error(item)
+ )
+ if not message:
+ return []
+ return [f"{prefix}: {message}" if prefix else message]
+
+ parts = []
+ for item in value:
+ parts.extend(cls._flatten_validation_errors(item, prefix))
+ return parts
+
+ text = cls._compact_error(value)
+ if not text:
+ return []
+ return [f"{prefix}: {text}" if prefix else text]
+
+ @classmethod
+ def _format_validation_error(cls, error):
+ """Return a readable validation error summary."""
+ messages = getattr(error, "messages", error)
+ parts = cls._flatten_validation_errors(messages)
+ return "; ".join(parts) if parts else cls._compact_error(messages)
+
+ @classmethod
+ def _describe_exception(cls, error):
+ """Return a compact exception label for logs and report messages."""
+ details = cls._compact_error(error)
+ if details:
+ return f"{error.__class__.__name__}: {details}"
+ return error.__class__.__name__
+
+ @staticmethod
+ def _context_labels(inspire_id=None, record_pid=None, draft_id=None, file_key=None):
+ """Build readable context labels for developer-facing logs."""
+ labels = []
+ if inspire_id:
+ labels.append(f"INSPIRE#{inspire_id}")
+ if record_pid:
+ labels.append(f"record {record_pid}")
+ if draft_id:
+ labels.append(f"draft {draft_id}")
+ if file_key:
+ labels.append(f"file '{file_key}'")
+ return labels
+
+ def _raise_unexpected_operation_error(
+ self,
+ *,
+ subject,
+ action,
+ error,
+ logger=None,
+ inspire_id=None,
+ record_pid=None,
+ draft_id=None,
+ file_key=None,
+ ):
+ """Log traceback-rich context and raise a readable writer error."""
+ context = ", ".join(
+ self._context_labels(
+ inspire_id=inspire_id,
+ record_pid=record_pid,
+ draft_id=draft_id,
+ file_key=file_key,
+ )
+ )
+ log_message = f"Unexpected error while handling {subject}"
+ if context:
+ log_message = f"{log_message} ({context})"
+
+ active_logger = logger or current_app.logger
+ active_logger.exception(log_message)
+
+ raise self._writer_error(
+ f"The {subject} could not be {action} because "
+ f"{self._describe_exception(error)}"
+ ) from error
+
+ @staticmethod
+ def _writer_error(message):
+ """Return a compact ``WriterError`` instance."""
+ return WriterError(message)
+
@hlog
- def _write_entry(self, stream_entry, *args, inspire_id=None, logger=None, **kwargs):
+ def _write_entry(
+ self, stream_entry, *args, inspire_id=None, logger=None, **kwargs
+ ):
"""Write entry to CDS."""
existing_records = self._get_existing_records(stream_entry)
@@ -47,12 +148,9 @@ def _write_entry(self, stream_entry, *args, inspire_id=None, logger=None, **kwar
existing_records_hits = existing_records.to_dict()["hits"]["hits"]
existing_records_ids = [hit["id"] for hit in existing_records_hits]
if multiple_records_found:
-
msg = "Multiple records match: {0}".format(", ".join(existing_records_ids))
-
logger.error(msg)
- stream_entry.errors.append(f"[inspire_id={inspire_id}] {msg}")
-
+ stream_entry.errors.append(msg)
return None
elif should_update:
@@ -84,15 +182,12 @@ def _process_entry(
try:
op_type = self._write_entry(stream_entry, *args, **kwargs)
except WriterError as e:
- error_message = f"Error while processing entry : {str(e)}."
+ error_message = self._compact_error(e)
except ValidationError as e:
- error_message = f"Validation error while processing entry: {str(e)}."
- # except Exception as e:
- # raise e
- # error_message = f"Unexpected error while processing entry: {str(e)}."
+ error_message = self._format_validation_error(e)
if error_message:
- logger.error(error_message)
- stream_entry.errors.append(f"[inspire_id={inspire_id}] {error_message}")
+ logger.error(f"Error while processing entry: {error_message}")
+ stream_entry.errors.append(error_message)
stream_entry.op_type = op_type
return stream_entry
@@ -172,11 +267,12 @@ def _get_existing_records(
},
)
- for filter_key, filter in filters_priority.items():
-
- if filter["value"]:
- combined_filter = dsl.Q("bool", filter=filter["filter"])
- logger.debug(f"Searching for existing records: {filter['filter']}")
+ for search_filter in filters_priority.values():
+ if search_filter["value"]:
+ combined_filter = dsl.Q("bool", filter=search_filter["filter"])
+ logger.debug(
+ f"Searching for existing records: {search_filter['filter']}"
+ )
result = current_rdm_records_service.search(
system_identity, extra_filter=combined_filter
@@ -219,10 +315,8 @@ def update_record(
)
# Normalize the checksum format in existing for comparison
- existing_checksums = [
- value["checksum"] for key, value in existing_files.items()
- ]
- new_checksums = [value["checksum"] for key, value in new_files.items()]
+ existing_checksums = [value["checksum"] for value in existing_files.values()]
+ new_checksums = [value["checksum"] for value in new_files.values()]
logger.debug(f"Existing files' checksums: {existing_checksums}.")
logger.debug(f"New files' checksums: {new_checksums}.")
@@ -244,7 +338,7 @@ def update_record(
engine = UpdateEngine(
strategies=UPDATE_STRATEGY_CONFIG,
- fail_on_conflict=True
+ fail_on_conflict=True,
)
ctx = UpdateContext(source="inspire_import")
result = engine.update(record_dict, entry, ctx, logger)
@@ -279,20 +373,27 @@ def update_record(
logger.info(f"Success: Record {record_pid} updated and published.")
except ValidationError as e:
- logger.error(
- f"Failure: draft {record_pid} not published, validation errors: {e}."
- )
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise e
+ raise self._writer_error(
+ "The updated draft could not be published because it "
+ f"failed validation: {self._format_validation_error(e)}"
+ )
except ValidationErrorWithMessageAsList as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise WriterError(
- f"ERROR: Draft {draft['id']} not published, validation errors: {e.messages}."
+ raise self._writer_error(
+ "The updated draft could not be published because it "
+ f"failed validation: {self._format_validation_error(e)}"
)
except Exception as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise WriterError(
- f"Draft {draft.id} failed publishing because of an unexpected error: {str(e)}."
+ self._raise_unexpected_operation_error(
+ subject=f"updated draft {draft.id}",
+ action="published",
+ error=e,
+ logger=logger,
+ inspire_id=inspire_id,
+ record_pid=record_pid,
+ draft_id=draft.id,
)
@hlog
@@ -306,16 +407,14 @@ def _update_files(
logger=None,
):
entry = stream_entry.entry
- logger.info("Updating files for record {}".format(record.id))
+ logger.info(f"Updating files for record {record.id}")
record_dict = record.data
existing_files = record_dict["files"]["entries"]
new_files = entry["files"].get("entries", {})
- existing_checksums = [
- value["checksum"] for key, value in existing_files.items()
- ]
- new_checksums = [value["checksum"] for key, value in new_files.items()]
+ existing_checksums = [value["checksum"] for value in existing_files.values()]
+ new_checksums = [value["checksum"] for value in new_files.values()]
files_to_create = list(set(new_checksums) - set(existing_checksums))
files_to_delete = list(set(existing_checksums) - set(new_checksums))
@@ -345,15 +444,22 @@ def _update_files(
file_content = self._fetch_file(stream_entry, inspire_url)
if not file_content:
- logger.error(f"Failed to fetch file content for: {key}")
- return
+ raise self._writer_error(
+ f"Could not fetch the INSPIRE file '{key}' after repeated download failures."
+ )
self._create_file(stream_entry, file, file_content, new_draft)
logger.info(f"{len(new_files.items())} files successfully created.")
@hlog
def _create_new_version(
- self, stream_entry, update_metadata, record, inspire_id=None, record_pid=None, logger=None
+ self,
+ stream_entry,
+ update_metadata,
+ record,
+ inspire_id=None,
+ record_pid=None,
+ logger=None,
):
"""For records with updated files coming from INSPIRE, create and publish a new version."""
entry = stream_entry.entry
@@ -405,15 +511,30 @@ def _create_new_version(
current_rdm_records_service.delete_draft(
system_identity, new_version_draft.id
)
- raise WriterError(
- f"Failure: Draft {new_version_draft.id} not published, validation errors: {e}."
+ raise self._writer_error(
+ "The new version draft could not be published because it "
+ f"failed validation: {self._format_validation_error(e)}"
)
except ValidationErrorWithMessageAsList as e:
current_rdm_records_service.delete_draft(
system_identity, new_version_draft.id
)
- raise WriterError(
- f"Failure: draft {new_version_draft.id} not published, validation errors: {e.messages}."
+ raise self._writer_error(
+ "The new version draft could not be published because it "
+ f"failed validation: {self._format_validation_error(e)}"
+ )
+ except Exception as e:
+ current_rdm_records_service.delete_draft(
+ system_identity, new_version_draft.id
+ )
+ self._raise_unexpected_operation_error(
+ subject=f"new version draft {new_version_draft.id}",
+ action="published",
+ error=e,
+ logger=logger,
+ inspire_id=inspire_id,
+ record_pid=record_pid,
+ draft_id=new_version_draft.id,
)
@hlog
@@ -443,7 +564,9 @@ def _create_new_record(
DATACITE_PREFIX = current_app.config["DATACITE_PREFIX"]
is_cds = DATACITE_PREFIX in doi.get("identifier", "")
if is_cds:
- raise WriterError("Trying to create record with CDS DOI")
+ raise self._writer_error(
+ "The INSPIRE record points to a CDS DOI, so it cannot be created as a new CDS record."
+ )
file_entries = entry["files"].get("entries", None)
logger.debug(f"Files to create: {len(file_entries) if file_entries else 0}")
@@ -451,6 +574,7 @@ def _create_new_record(
logger.debug("Creating new record draft")
draft = current_rdm_records_service.create(system_identity, data=entry)
+ current_file_key = None
logger.info(f"New draft is created ({draft.id}).")
@@ -461,22 +585,46 @@ def _create_new_record(
)
for key, file_data in file_entries.items():
+ current_file_key = key
logger.debug(f"Processing file: {key}")
+ if not file_data.get("checksum"):
+ raise self._writer_error(
+ "The INSPIRE file "
+ f"'{file_data.get('key', key)}' cannot be imported because "
+ "INSPIRE did not provide a checksum."
+ )
+
inspire_url = file_data.pop("source_url", None)
file_content = self._fetch_file(stream_entry, inspire_url)
if not file_content:
- logger.error(f"Failed to fetch file content for: {key}")
-
- return
+ raise self._writer_error(
+ f"Could not fetch the INSPIRE file '{key}' after repeated download failures."
+ )
self._create_file(stream_entry, file_data, file_content, draft)
logger.info(f"All the files successfully created.")
+ except ValidationError as e:
+ current_rdm_records_service.delete_draft(system_identity, draft["id"])
+ raise self._writer_error(
+ "The new draft could not be created because file validation "
+ f"failed: {self._format_validation_error(e)}"
+ ) from e
+ except WriterError:
+ current_rdm_records_service.delete_draft(system_identity, draft["id"])
+ raise
except Exception as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- logger.error(f"Draft {draft.id} is deleted due to errors.")
- raise e
+ self._raise_unexpected_operation_error(
+ subject=f"file import for new draft {draft.id}",
+ action="completed",
+ error=e,
+ logger=logger,
+ inspire_id=inspire_id,
+ draft_id=draft.id,
+ file_key=current_file_key,
+ )
else:
try:
self._add_community(stream_entry, draft)
@@ -489,17 +637,26 @@ def _create_new_record(
except ValidationError as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise WriterError(
- f"Failure: draft {draft['id']} not published, validation errors: {e}."
+ raise self._writer_error(
+ "The new draft could not be published because it failed "
+ f"validation: {self._format_validation_error(e)}"
)
except ValidationErrorWithMessageAsList as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise WriterError(
- f"Failure: draft {draft['id']} not published, validation errors: {e.messages}."
+ raise self._writer_error(
+ "The new draft could not be published because it failed "
+ f"validation: {self._format_validation_error(e)}"
)
except Exception as e:
current_rdm_records_service.delete_draft(system_identity, draft["id"])
- raise e
+ self._raise_unexpected_operation_error(
+ subject=f"new draft {draft.id}",
+ action="published",
+ error=e,
+ logger=logger,
+ inspire_id=inspire_id,
+ draft_id=draft.id,
+ )
@hlog
def _fetch_file(
@@ -544,10 +701,7 @@ def _fetch_file(
logger.debug("Retrying in 1 minute...")
time.sleep(60)
- logger.error(
- f"Retrieving file request failed. Max retries {max_retries} reached."
- f" URL: {inspire_url}."
- )
+ return None
@hlog
def _create_file(
@@ -567,6 +721,13 @@ def _create_file(
file_source = file_data.get("source")
new_checksum = None
+ if not inspire_checksum:
+ raise self._writer_error(
+ "The INSPIRE file "
+ f"'{file_data['key']}' cannot be imported because INSPIRE did "
+ "not provide a checksum."
+ )
+
try:
service.draft_files.init_files(
system_identity,
@@ -599,14 +760,9 @@ def _create_file(
assert inspire_checksum == new_checksum
elif inspire_checksum and file_source == "arxiv":
assert inspire_checksum == new_checksum
- except AssertionError as e:
- ## TODO draft? delete record completely?
- logger.error(
- f"Files checksums don't match. Delete file: '{file_data['key']}' from draft."
- )
-
+ except AssertionError:
service.draft_files.delete_file(system_identity, draft.id, file_data["key"])
-
- raise WriterError(
- f"File {file_data['key']} checksum mismatch. Expected: {inspire_checksum}, got: {new_checksum}."
- )
\ No newline at end of file
+ raise self._writer_error(
+ "File checksum mismatch for "
+ f"'{file_data['key']}'. Expected {inspire_checksum}, got {new_checksum}."
+ )
diff --git a/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html b/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html
index 761ba03a..f42747a7 100644
--- a/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html
+++ b/site/cds_rdm/templates/semantic-ui/cds_rdm/harvester_download/report_body.html
@@ -4,9 +4,62 @@
CDS-RDM is free software; you can redistribute it and/or modify it
under the terms of the GPL-2.0 License; see LICENSE file for more details.
-Job run report body: layout aligned with invenio_jobs RunsLogs. Presentation-only
-logic lives here; resource.py only supplies run, lines, and counts.
+Job run report body: layout aligned with invenio_jobs RunsLogs. Grouped issues
+use collapsible sections; expanded content shows all affected records and
+per-record log lines in the same format as the stock job log viewer.
#}
+{% macro render_issue_group(issue, level_name, label_color) %}
+
+ {{ level_name }}
+ {{ issue.title }}
+
+ {{ _("%(count)s records", count=issue.count) }}
+
+ {% if issue.examples %}
+
+ {{ _("Examples") }}:
+ {% for inspire_id in issue.examples %}
+
+ INSPIRE#{{ inspire_id }}
+ {% if not loop.last %}, {% endif %}
+ {% endfor %}
+
+ {% endif %}
+
+
{{ _("No log lines in this view.") }}
- {% endif %}