Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 92 additions & 35 deletions muninn/advanced/temporal_kg.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,54 @@ def add_temporal_fact(
Record a fact that is true only for a specific time window.
If valid_end is None, it is currently true (open-ended).
"""
return self.add_temporal_facts_batch([{
"subject": subject,
"predicate": predicate,
"obj": obj,
"valid_start": valid_start,
"source_memory": source_memory,
"valid_end": valid_end
}])

def add_temporal_facts_batch(self, facts: List[Dict[str, Any]]) -> bool:
"""Batch record facts that are true for specific time windows."""
if not facts:
return True

# Pre-create all entities first to ensure they exist
entities_to_add = []
for f in facts:
entities_to_add.append({"name": f["subject"], "entity_type": "unknown"})
entities_to_add.append({"name": f["obj"], "entity_type": "unknown"})
self.graph.add_entities_batch(entities_to_add)
Comment on lines +70 to +74

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Sending a list of entities that potentially contains many duplicates to add_entities_batch is inefficient. Deduplicating the names before the call reduces the payload size and the number of MERGE operations performed by the graph store. Additionally, the return value of add_entities_batch should be checked to ensure the operation succeeded before proceeding.

Suggested change
entities_to_add = []
for f in facts:
entities_to_add.append({"name": f["subject"], "entity_type": "unknown"})
entities_to_add.append({"name": f["obj"], "entity_type": "unknown"})
self.graph.add_entities_batch(entities_to_add)
# Pre-create all entities first to ensure they exist
unique_names = {f["subject"] for f in facts} | {f["obj"] for f in facts}
if not self.graph.add_entities_batch([{"name": name, "entity_type": "unknown"} for name in unique_names]):
return False


conn = self.graph._get_conn()
# Ensure entities exist
self.graph.add_entity(subject, "unknown")
self.graph.add_entity(obj, "unknown")

end = valid_end if valid_end is not None else float("inf")

facts_params = []
for f in facts:
valid_end = f.get("valid_end")
end = valid_end if valid_end is not None else float("inf")
facts_params.append({
"subj": f["subject"],
"obj": f["obj"],
"pred": f["predicate"],
"start": float(f["valid_start"]),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle valid_start conversion inside guarded path

This batch refactor moved float(f["valid_start"]) outside the try block, so add_temporal_fact(...) now raises ValueError for non-numeric input instead of returning False as it did before. That is a behavior regression for callers that rely on the boolean failure contract (or broad exception safety) and can now crash ingestion flows on malformed data.

Useful? React with 👍 / 👎.

"valid_until": float(end),
"source_mem": f["source_memory"],
})

try:
conn.execute(
"MATCH (a:Entity {name: $subj}), (b:Entity {name: $obj}) "
"CREATE (a)-[:VALID_DURING {predicate: $pred, start_time: $start, end_time: $valid_until, source_memory: $source_mem}]->(b)",
{
"subj": subject,
"obj": obj,
"pred": predicate,
"start": float(valid_start),
"valid_until": float(end),
"source_mem": source_memory,
}
"""
UNWIND $facts AS f
MATCH (a:Entity {name: f.subj}), (b:Entity {name: f.obj})
CREATE (a)-[:VALID_DURING {predicate: f.pred, start_time: f.start, end_time: f.valid_until, source_memory: f.source_mem}]->(b)
""",
{"facts": facts_params}
)
Comment on lines +78 to 99

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Matching entities by the name property is inefficient and risky. In the Entity table, id is the primary key and is indexed. Matching by name requires a full scan unless an index is explicitly created. Furthermore, in a multi-tenant environment where the same entity name might exist across different namespaces, matching solely on name can lead to incorrect edges being created between entities in different scopes. Since add_entities_batch defaults to the global scope, we should match using the corresponding scoped IDs.

        facts_params = []
        for f in facts:
            valid_end = f.get("valid_end")
            end = valid_end if valid_end is not None else float("inf")
            facts_params.append({
                "subj_id": f"global/global/{f['subject']}",
                "obj_id": f"global/global/{f['obj']}",
                "pred": f["predicate"],
                "start": float(f["valid_start"]),
                "valid_until": float(end),
                "source_mem": f["source_memory"],
            })

        try:
            conn.execute(
                """
                UNWIND $facts AS f
                MATCH (a:Entity {id: f.subj_id}), (b:Entity {id: f.obj_id})
                CREATE (a)-[:VALID_DURING {predicate: f.pred, start_time: f.start, end_time: f.valid_until, source_memory: f.source_mem}]->(b)
                """,
                {"facts": facts_params}
            )

return True
except Exception as e:
logger.warning(f"Failed to add temporal fact: {e}")
logger.warning(f"Failed to batch add temporal facts: {e}")
return False

def shadow_edge(
Expand All @@ -88,23 +113,41 @@ def shadow_edge(
Closes the validity window of an active temporal fact, creating a "Shadow Edge".
This preserves the fact in history but bypasses it for current-day retrieval.
"""
return self.shadow_edges_batch([{
"subject": subject,
"predicate": predicate,
"obj": obj,
"superseded_at": superseded_at
}])

def shadow_edges_batch(self, edges: List[Dict[str, Any]]) -> bool:
"""Closes the validity window of multiple active temporal facts."""
if not edges:
return True

conn = self.graph._get_conn()
shadow_params = []
for e in edges:
shadow_params.append({
"subj": e["subject"],
"pred": e["predicate"],
"obj": e["obj"],
"ts": float(e["superseded_at"]),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep shadow timestamp coercion inside error handling

Both shadowing paths now coerce superseded_at to float before entering try, so shadow_edge(...)/shadow_memory_edges(...) raise on bad timestamps instead of returning False like the previous single-item implementations. In environments where timestamps can be user- or model-derived strings, this turns recoverable operation failures into uncaught exceptions.

Useful? React with 👍 / 👎.

})

try:
# Match the active edge (where end_time is in the future or inf) and bound it to now.
conn.execute(
"MATCH (a:Entity {name: $subj})-[r:VALID_DURING {predicate: $pred}]->(b:Entity {name: $obj}) "
"WHERE r.end_time >= $ts "
"SET r.end_time = $ts",
{
"subj": subject,
"pred": predicate,
"obj": obj,
"ts": float(superseded_at),
}
"""
UNWIND $shadows AS s
MATCH (a:Entity {name: s.subj})-[r:VALID_DURING {predicate: s.pred}]->(b:Entity {name: s.obj})
WHERE r.end_time >= s.ts
SET r.end_time = s.ts
""",
{"shadows": shadow_params}
)
Comment on lines +129 to 147

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the batch add method, matching entities by name here is inefficient and potentially ambiguous. Using the primary key id ensures that we target the correct entities in the global scope and leverages the index for performance.

Suggested change
shadow_params = []
for e in edges:
shadow_params.append({
"subj": e["subject"],
"pred": e["predicate"],
"obj": e["obj"],
"ts": float(e["superseded_at"]),
})
try:
# Match the active edge (where end_time is in the future or inf) and bound it to now.
conn.execute(
"MATCH (a:Entity {name: $subj})-[r:VALID_DURING {predicate: $pred}]->(b:Entity {name: $obj}) "
"WHERE r.end_time >= $ts "
"SET r.end_time = $ts",
{
"subj": subject,
"pred": predicate,
"obj": obj,
"ts": float(superseded_at),
}
"""
UNWIND $shadows AS s
MATCH (a:Entity {name: s.subj})-[r:VALID_DURING {predicate: s.pred}]->(b:Entity {name: s.obj})
WHERE r.end_time >= s.ts
SET r.end_time = s.ts
""",
{"shadows": shadow_params}
)
shadow_params = []
for e in edges:
shadow_params.append({
"subj_id": f"global/global/{e['subject']}",
"obj_id": f"global/global/{e['obj']}",
"pred": e["predicate"],
"ts": float(e["superseded_at"]),
})
try:
conn.execute(
"""
UNWIND $shadows AS s
MATCH (a:Entity {id: s.subj_id})-[r:VALID_DURING {predicate: s.pred}]->(b:Entity {id: s.obj_id})
WHERE r.end_time >= s.ts
SET r.end_time = s.ts
""",
{"shadows": shadow_params}
)

return True
except Exception as e:
logger.warning(f"Failed to shadow temporal edge: {e}")
logger.warning(f"Failed to batch shadow temporal edges: {e}")
return False

def shadow_memory_edges(
Expand All @@ -115,20 +158,34 @@ def shadow_memory_edges(
"""
Closes the validity window of all active temporal facts sourced from a specific memory.
"""
return self.shadow_memory_edges_batch([{"memory_id": memory_id, "superseded_at": superseded_at}])

def shadow_memory_edges_batch(self, memories: List[Dict[str, Any]]) -> bool:
"""Closes the validity window of temporal facts sourced from specific memories."""
if not memories:
return True

conn = self.graph._get_conn()
shadow_params = []
for m in memories:
shadow_params.append({
"mem_id": m["memory_id"],
"ts": float(m["superseded_at"]),
})

try:
conn.execute(
"MATCH (:Entity)-[r:VALID_DURING {source_memory: $mem_id}]->(:Entity) "
"WHERE r.end_time >= $ts "
"SET r.end_time = $ts",
{
"mem_id": memory_id,
"ts": float(superseded_at),
}
"""
UNWIND $shadows AS s
MATCH (:Entity)-[r:VALID_DURING {source_memory: s.mem_id}]->(:Entity)
WHERE r.end_time >= s.ts
SET r.end_time = s.ts
""",
{"shadows": shadow_params}
)
return True
except Exception as e:
logger.warning(f"Failed to shadow memory edges: {e}")
logger.warning(f"Failed to batch shadow memory edges: {e}")
return False

def query_valid_at(self, timestamp: float, limit: int = 50) -> List[Dict[str, Any]]:
Expand Down
51 changes: 37 additions & 14 deletions muninn/store/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,48 @@ def add_entity(
user_id: str = "global",
namespace: str = "global"
) -> bool:
return self.add_entities_batch([
{
"name": name,
"entity_type": entity_type,
"user_id": user_id,
"namespace": namespace
}
])

def add_entities_batch(self, entities: List[Dict[str, Any]]) -> bool:
"""Batch insert or update multiple entities."""
if not entities:
return True

conn = self._get_conn()
now = time.time()
# Create a scoped unique ID
entity_id = f"{user_id}/{namespace}/{name}"

entities_params = []
for e in entities:
uid = e.get("user_id", "global")
ns = e.get("namespace", "global")
name = e["name"]
e_id = f"{uid}/{ns}/{name}"
entities_params.append({
"id": e_id,
"name": name,
"type": e.get("entity_type", "unknown"),
"now": now,
"uid": uid,
"ns": ns
})

try:
# Try to merge (upsert)
conn.execute(
"MERGE (e:Entity {id: $id}) "
"ON CREATE SET e.name = $name, e.user_id = $uid, e.namespace = $ns, "
"e.entity_type = $type, e.first_seen = $now, e.last_seen = $now, e.mention_count = 1 "
"ON MATCH SET e.last_seen = $now, e.mention_count = e.mention_count + 1",
{
"id": entity_id, "name": name, "uid": user_id, "ns": namespace,
"type": entity_type, "now": now
}
)
conn.execute("""
UNWIND $entities AS e
MERGE (n:Entity {id: e.id})
ON CREATE SET n.name = e.name, n.user_id = e.uid, n.namespace = e.ns, n.entity_type = e.type, n.first_seen = e.now, n.last_seen = e.now, n.mention_count = 1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line is excessively long, which reduces readability and violates the project's SOTA+ standards for code quality. Breaking it into multiple lines makes the ON CREATE SET logic much clearer.

                ON CREATE SET n.name = e.name, n.user_id = e.uid, n.namespace = e.ns, 
                              n.entity_type = e.type, n.first_seen = e.now, n.last_seen = e.now, 
                              n.mention_count = 1
References
  1. Every contribution must meet SOTA+ (State of the Art Plus) standards of precision, quality, and logic. (link)

ON MATCH SET n.last_seen = e.now, n.mention_count = n.mention_count + 1
""", {"entities": entities_params})
return True
except Exception as e:
logger.debug(f"Entity creation: {e}")
logger.debug(f"Batch add entities failed: {e}")
return False

def create_relation(
Expand Down
Loading