Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion hed/errors/schema_error_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,15 @@ def schema_error_SCHEMA_CONVERSION_FACTOR_NOT_POSITIVE(tag, conversion_factor):
@hed_error(
SchemaAttributeErrors.SCHEMA_HED_ID_INVALID, actual_code=SchemaAttributeErrors.SCHEMA_ATTRIBUTE_VALUE_INVALID
)
def schema_error_SCHEMA_HED_ID_INVALID(tag, new_id, old_id=None, valid_min=None, valid_max=None):
def schema_error_SCHEMA_HED_ID_INVALID(
tag, new_id, old_id=None, valid_min=None, valid_max=None, duplicate_tag=None, duplicate_tag_section=None
):
if duplicate_tag:
section_info = f" in the '{duplicate_tag_section}' section" if duplicate_tag_section else ""
return (
f"Tag '{tag}' has hedId '{new_id}' which is already used by '{duplicate_tag}'{section_info}. "

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: double space before "Each hedId" — should be a single space to match the style of the other messages in this function.

Suggested change
f"Tag '{tag}' has hedId '{new_id}' which is already used by '{duplicate_tag}'{section_info}. "
f"Tag '{tag}' has hedId '{new_id}' which is already used by '{duplicate_tag}'{section_info}. "

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix on next PR.

f"Each hedId must be unique across all schema sections."
)
if old_id:
return (
f"Tag '{tag}' has an invalid hedId '{new_id}'. "
Expand Down
9 changes: 6 additions & 3 deletions hed/schema/schema_io/df_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ def load_dataframes(filenames):
elif os.path.exists(filename):
# Handle the extra files if they are present.
dataframes[key] = pd.read_csv(filename, sep="\t", dtype=str, na_filter=False)
except OSError:
# todo: consider if we want to report this error(we probably do)
pass # We will use a blank one for this
except FileNotFoundError:
pass # Missing section files are valid for partial/library schemas; caller gets an empty DataFrame
except OSError as e:
raise HedFileError(
HedExceptions.INVALID_FILE_FORMAT, f"Could not load schema file '{filename}': {e}", filename
) from e
return dataframes


Expand Down
7 changes: 3 additions & 4 deletions hed/schema/schema_io/hed_id_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def update_dataframes_from_schema(dataframes, schema, schema_name="", assign_mis
schema_name = schema.library
# 1. Verify existing HED ids don't conflict between schema/dataframes
for df_key, df in dataframes.items():
if df_key in constants.DF_SUFFIXES:
if df_key in constants.DF_EXTRAS:
Comment thread
VisLab marked this conversation as resolved.
continue
section_key = constants.section_mapping_hed_id.get(df_key)
if not section_key:
Expand Down Expand Up @@ -173,7 +173,7 @@ def _verify_hedid_matches(section, df, unused_tag_ids):
id_value = df_id.removeprefix("HED_")
try:
id_int = int(id_value)
if id_int not in unused_tag_ids:
if unused_tag_ids and id_int not in unused_tag_ids:
hedid_errors += schema_util.format_error(
row_number,
row,
Expand Down Expand Up @@ -213,8 +213,7 @@ def assign_hed_ids_section(df, unused_tag_ids):
# we already verified existing ones
if hed_id:
continue
hed_id = f"HED_{sorted_unused_ids.pop():07d}"
row[constants.hed_id] = hed_id
df.at[_row_number, constants.hed_id] = f"HED_{sorted_unused_ids.pop():07d}"


def merge_dfs(dest_df, source_df):
Expand Down
34 changes: 34 additions & 0 deletions hed/schema/schema_validation/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def check_compliance(hed_schema, check_for_warnings=True, name=None, error_handl
issues += validator.check_invalid_characters()
issues += validator.check_attributes()
issues += validator.check_duplicate_names()
issues += validator.check_duplicate_hed_ids()
issues += validator.check_extras_columns()
issues += validator.check_annotation_attribute_values()

Expand Down Expand Up @@ -309,6 +310,39 @@ def check_attributes(self):
self.summary.record_issues(len(issues))
return issues

def check_duplicate_hed_ids(self):
"""Check for duplicate hedId values across all schema sections."""
self.summary.start_check(
"duplicate_hed_ids",
"Check for duplicate hedId values within or across schema sections.",
)
issues = []
seen_ids: dict[str, tuple[str, str]] = {} # maps hedId string → (first tag name, section key)
for section_key in HedSectionKey:
section = self.hed_schema[section_key]
self.summary.record_section(section_key, len(section))
for entry in section.values():
hed_id = entry.attributes.get(HedKey.HedID)
if not hed_id:
continue
if hed_id in seen_ids:
first_tag_name, first_section_key = seen_ids[hed_id]
self.error_handler.push_error_context(ErrorContext.SCHEMA_SECTION, str(section_key))
self.error_handler.push_error_context(ErrorContext.SCHEMA_TAG, entry.name)
issues += self.error_handler.format_error_with_context(
SchemaAttributeErrors.SCHEMA_HED_ID_INVALID,
entry.name,
new_id=hed_id,
duplicate_tag=first_tag_name,
duplicate_tag_section=first_section_key,
)
self.error_handler.pop_error_context()
self.error_handler.pop_error_context()
else:
seen_ids[hed_id] = (entry.name, section_key.value)
self.summary.record_issues(len(issues))
return issues
Comment thread
VisLab marked this conversation as resolved.

def check_duplicate_names(self):
"""Check for duplicate entry names across library merges."""
self.summary.start_check(
Expand Down
56 changes: 35 additions & 21 deletions hed/schema/schema_validation/hed_id_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,30 @@ def _get_previous_version(version, library):
prev_version = f"{library}_{prev_version}"
return prev_version

def _get_old_id_int(self, tag_entry, tag_library):
"""Returns the integer hedId from the previous schema version for this entry, or None.

Parameters:
tag_entry (HedSchemaEntry): The schema entry being validated.
tag_library (str): The library prefix for this entry (empty string for standard schema).

Returns:
int or None: The previous hedId as an integer, or None if absent or unparsable.
"""
previous_schema = self._previous_schemas.get(tag_library)
if not previous_schema:
return None
old_entry = previous_schema.get_tag_entry(tag_entry.name, key_class=tag_entry.section_key)
if not old_entry:
return None
old_id_str = old_entry.attributes.get(HedKey.HedID)
if not old_id_str:
return None
try:
return int(old_id_str.removeprefix("HED_"))
except ValueError:
return None # Silently ignore invalid old_id values (shouldn't happen in practice)

def verify_tag_id(self, hed_schema, tag_entry, attribute_name):
"""Validates the hedID attribute values

Expand All @@ -72,31 +96,21 @@ def verify_tag_id(self, hed_schema, tag_entry, attribute_name):
issues(list): A list of issues from validating this attribute.
"""
# todo: If you have a way to know the schema should have 100% ids, you could check for that and flag missing
new_id = tag_entry.attributes.get(attribute_name, "")
old_id = None
tag_library = tag_entry.has_attribute(HedKey.InLibrary, return_value=True)
if not tag_library:
tag_library = ""
new_id_str = tag_entry.attributes.get(attribute_name, "")
tag_library = tag_entry.has_attribute(HedKey.InLibrary, return_value=True) or ""
old_id = self._get_old_id_int(tag_entry, tag_library)

previous_schema = self._previous_schemas.get(tag_library)
if previous_schema:
old_entry = previous_schema.get_tag_entry(tag_entry.name, key_class=tag_entry.section_key)
if old_entry:
old_id = old_entry.attributes.get(HedKey.HedID)

if old_id:
try:
old_id = int(old_id.removeprefix("HED_"))
except ValueError:
# Just silently ignore invalid old_id values(this shouldn't happen)
pass
if new_id:
new_id = None
if new_id_str:
try:
new_id = int(new_id.removeprefix("HED_"))
new_id = int(new_id_str.removeprefix("HED_"))
except ValueError:
Comment thread
VisLab marked this conversation as resolved.
return ErrorHandler.format_error(SchemaAttributeErrors.SCHEMA_HED_ID_INVALID, tag_entry.name, new_id)
return ErrorHandler.format_error(
SchemaAttributeErrors.SCHEMA_HED_ID_INVALID, tag_entry.name, new_id_str
)

# Nothing to verify
if new_id is None and old_id is None:
if not new_id_str and old_id is None:
return []

issues = []
Expand Down
64 changes: 53 additions & 11 deletions tests/schema/test_hed_id_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ def test_not_int(self):
errors = _verify_hedid_matches(self.schema_82.tags, df, hed_id_util._get_hedid_range("", constants.TAG_KEY))
self.assertEqual(len(errors), 1)

def test_verify_unknown_library_skips_range_check(self):
"""An unregistered library returns empty range — IDs should not be reported as out-of-range."""
empty_range = set()
df = pd.DataFrame([{"rdfs:label": "Event", "hedId": "HED_0012001"}])
# testlib has no library_data entry, so _get_hedid_range returns an empty set
errors = _verify_hedid_matches(self.schema_82.tags, df, empty_range)
self.assertEqual(len(errors), 0, "Unknown-library empty range should not trigger range errors")

def test_empty_unused_ids_no_crash(self):
"""_verify_hedid_matches must not crash when unused_tag_ids is empty (covers min/max guard)."""
empty_range = set()
df = pd.DataFrame(
[{"rdfs:label": "Event", "hedId": "HED_0099999"}, {"rdfs:label": "Age-#", "hedId": "HED_0000001"}]
)
# Should complete without raising ValueError from min()/max()
errors = _verify_hedid_matches(self.schema_82.tags, df, empty_range)
self.assertEqual(len(errors), 0)

def test_get_all_ids_exists(self):
# Test when hedId column exists and has proper prefixed IDs
df = pd.DataFrame({"hedId": ["HED_0000001", "HED_0000002", "HED_0000003"]})
Expand Down Expand Up @@ -156,29 +174,53 @@ def test_assign_hed_ids_section(self):

self.assertTrue(df.equals(expected_result))

def test_assign_actually_mutates_df(self):
"""assign_hed_ids_section must write IDs back into the original DataFrame."""
df = pd.DataFrame({"hedId": ["", "", ""], "label": ["A", "B", "C"]})
assign_hed_ids_section(df, {1, 2, 3})
# All rows should now have a non-empty hedId
self.assertTrue(all(df["hedId"].str.startswith("HED_")), "IDs were not written into the DataFrame")

def test_assign_preserves_existing_ids(self):
"""assign_hed_ids_section must not overwrite rows that already have an ID."""
df = pd.DataFrame({"hedId": ["HED_0000005", "", "HED_0000010"], "label": ["A", "B", "C"]})
assign_hed_ids_section(df, {1, 2, 3, 4, 5, 10})
self.assertEqual(df.loc[0, "hedId"], "HED_0000005")
self.assertEqual(df.loc[2, "hedId"], "HED_0000010")
self.assertTrue(df.loc[1, "hedId"].startswith("HED_"))


class TestUpdateDataframes(unittest.TestCase):
def test_update_dataframes_from_schema(self):
# valid direction first
schema_dataframes = hed_schema_global.get_as_dataframes()
schema_83 = load_schema_version("8.3.0")
# Use matching schema + dataframes so the ID verification passes
schema = load_schema_version("8.4.0")
schema_dataframes = schema.get_as_dataframes()
# Add a test column and ensure it stays around
fixed_value = "test_column_value"
for _key, df in schema_dataframes.items():
df["test_column"] = fixed_value

updated_dataframes = update_dataframes_from_schema(schema_dataframes, schema_83)
updated_dataframes = update_dataframes_from_schema(schema_dataframes, schema)

for key, df in updated_dataframes.items():
if key not in constants.DF_EXTRAS:
self.assertTrue((df["test_column"] == fixed_value).all())
# this is expected to bomb horribly, since schema lacks many of the spreadsheet entries.
schema = load_schema_version("8.3.0")
schema_dataframes_new = load_schema_version("8.3.0").get_as_dataframes()
try:
update_dataframes_from_schema(schema_dataframes_new, schema)
except HedFileError as e:
self.assertEqual(len(e.issues), 115)

def test_conflict_detected(self):
"""Bug #1 regression: verify HedFileError IS raised when a hedId in the dataframe mismatches the schema."""
schema = load_schema_version("8.4.0")
schema_dataframes = schema.get_as_dataframes()

# Corrupt a hedId in the Tag dataframe so it mismatches the schema
tag_df = schema_dataframes[constants.TAG_KEY]
# Change the first non-empty hedId to an out-of-range value
non_empty_mask = tag_df["hedId"].str.startswith("HED_", na=False)
first_idx = tag_df.index[non_empty_mask][0]
schema_dataframes[constants.TAG_KEY].loc[first_idx, "hedId"] = "HED_0000000"

with self.assertRaises(HedFileError) as ctx:
update_dataframes_from_schema(schema_dataframes, schema)
self.assertGreater(len(ctx.exception.issues), 0)


if __name__ == "__main__":
Expand Down
20 changes: 19 additions & 1 deletion tests/schema/test_schema_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_summary_is_list(self):
_ = list(issues)

def test_has_all_checks(self):
"""Summary should contain all 7 top-level checks."""
"""Summary should contain all 8 top-level checks."""
issues = self.schema_84.check_compliance()
summary = issues.compliance_summary
check_names = [c["name"] for c in summary.check_results]
Expand All @@ -53,6 +53,7 @@ def test_has_all_checks(self):
"invalid_characters",
"attributes",
"duplicate_names",
"duplicate_hed_ids",
"extras_columns",
Comment thread
VisLab marked this conversation as resolved.
"annotation_attributes",
]
Expand All @@ -77,6 +78,23 @@ def test_summary_entries_checked_positive(self):
if check["name"] in ("invalid_characters", "attributes", "duplicate_names"):
self.assertGreater(check["entries_checked"], 0, f"Check '{check['name']}' has no entries checked")

def test_detects_duplicate_hed_id(self):
"""check_duplicate_hed_ids must fire when two entries share a hedId."""
import copy
from hed.schema.schema_validation.compliance import SchemaValidator
from hed.errors.error_reporter import ErrorHandler

test_schema = copy.deepcopy(self.schema_84)
tags = list(test_schema[HedSectionKey.Tags].values())
shared_id = "HED_0010001"
tags[0].attributes[HedKey.HedID] = shared_id
tags[1].attributes[HedKey.HedID] = shared_id

sv = SchemaValidator(test_schema, ErrorHandler())
issues = sv.check_duplicate_hed_ids()
codes = [i["code"] for i in issues]
self.assertIn("SCHEMA_ATTRIBUTE_VALUE_INVALID", codes)

def test_summary_sections_tracked(self):
"""Character and attribute checks should list all 7 schema sections."""
issues = self.schema_84.check_compliance()
Expand Down
18 changes: 14 additions & 4 deletions tests/schema/test_schema_validator_hed_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ def test_verify_tag_id(self):
id_validator = HedIDValidator(self.hed_schema84)

issues = id_validator.verify_tag_id(self.hed_schema84, event_entry, HedKey.HedID)
self.assertTrue("It has changed", issues[0]["message"])
self.assertTrue("between 10000", issues[0]["message"])
self.assertGreater(len(issues), 0)
messages = [i["message"] for i in issues]
self.assertTrue(any("It has changed" in m for m in messages))
self.assertTrue(any("between 10000" in m for m in messages))

event_entry = self.hed_schema84.tags["Event"]
def test_verify_tag_id_invalid_format(self):
"""A non-integer hedId should produce an INVALID format error."""
schema84 = copy.deepcopy(self.hed_schema)
schema84.header_attributes[hed_schema_constants.VERSION_ATTRIBUTE] = "8.4.0"
event_entry = schema84.tags["Event"]
event_entry.attributes[HedKey.HedID] = "HED_XXXXXXX"
self.assertTrue("It must be an integer in the format", issues[0]["message"])

id_validator = HedIDValidator(schema84)
issues = id_validator.verify_tag_id(schema84, event_entry, HedKey.HedID)
self.assertGreater(len(issues), 0)
self.assertIn("It must be an integer in the format", issues[0]["message"])
Loading