diff --git a/core/src/links/Links.ts b/core/src/links/Links.ts index a4f9428eb..92a424a2b 100644 --- a/core/src/links/Links.ts +++ b/core/src/links/Links.ts @@ -42,10 +42,14 @@ export class LinkExpression extends ExpressionGeneric(Link) { return hash; } status?: LinkStatus; + + graph?: string; }; export class LinkExpressionInput extends ExpressionGenericInput(LinkInput) { hash: () => number; status?: LinkStatus; + + graph?: string; }; export function linkEqual(l1: LinkExpression, l2: LinkExpression): boolean { @@ -53,7 +57,8 @@ export function linkEqual(l1: LinkExpression, l2: LinkExpression): boolean { l1.timestamp == l2.timestamp && l1.data.source == l2.data.source && l1.data.predicate == l2.data.predicate && - l1.data.target == l2.data.target + l1.data.target == l2.data.target && + (l1.graph ?? '') == (l2.graph ?? '') } export function isLink(l: any): boolean { diff --git a/core/src/model/Ad4mModel.ts b/core/src/model/Ad4mModel.ts index c52b4d9d3..d078757fe 100644 --- a/core/src/model/Ad4mModel.ts +++ b/core/src/model/Ad4mModel.ts @@ -516,6 +516,7 @@ export class Ad4mModel { className, properties: propertiesMetadata, relations: relationsMetadata, + graph: !!(this as any)._graphRooted, }; } @@ -536,6 +537,13 @@ export class Ad4mModel { * const recipe = new Recipe(perspective, "literal:..."); * ``` */ + /** + * The resolved named graph IRI for this instance. + * Set during `create()` to propagate parent graph context to child entities. + * @private + */ + private _resolvedGraphIri?: string; + constructor(perspective: PerspectiveProxy, baseExpression?: string) { this._baseExpression = baseExpression ? baseExpression : Literal.from(makeRandomId(24)).toUrl(); this._perspective = perspective; @@ -556,6 +564,28 @@ export class Ad4mModel { return this._perspective; } + /** + * Returns the named graph IRI for this instance. + * Priority: explicit _resolvedGraphIri (set during create with parent context) > + * model-level graph: true > undefined (default graph). + */ + get graphIri(): string | undefined { + if (this._resolvedGraphIri) return this._resolvedGraphIri; + const ctor = this.constructor as typeof Ad4mModel; + if ((ctor as any)._graphRooted) { + return `ad4m://graph/${this._baseExpression}`; + } + return undefined; + } + + /** + * Compute the named graph IRI for a given base expression. + * Only meaningful for graph-rooted models. + */ + static graphIriFor(baseExpression: string): string { + return `ad4m://graph/${baseExpression}`; + } + /** * Get property metadata from decorator. * @private @@ -1086,7 +1116,25 @@ export class Ad4mModel { query, classNameOverride, ); - const result = await perspective.modelQuery(className, queryJson, shapeJson); + // Resolve graph scoping from the parent's model metadata. + // If the parent model is graph-rooted, scope queries to the parent's graph. + // If the queried model itself is graph-rooted and has a parent, the parent + // provides the graph scope. Without a parent, queries are unscoped (union all). + const metadata = this.getModelMetadata(); + let graphIris: string[] | undefined; + if (query.parent?.id) { + if ('model' in query.parent && query.parent.model) { + const parentMeta = (query.parent.model as typeof Ad4mModel).getModelMetadata?.(); + if (parentMeta?.graph) { + graphIris = [`ad4m://graph/${query.parent.id}`]; + } + } else if (metadata.graph) { + // No parent model info, but queried model is graph-rooted — scope to parent's graph + graphIris = [`ad4m://graph/${query.parent.id}`]; + } + } + + const result = await perspective.modelQuery(className, queryJson, shapeJson, graphIris); // Convert JSON instances to model class instances, recursively constructing // class instances for any included relations resolved by Rust. @@ -1297,7 +1345,7 @@ export class Ad4mModel { value = await this._perspective.createExpression(value, resolveLanguage); } - await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value }], batchId); + await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value }], batchId, this.graphIri); } /** Resolve a relation argument to a plain string ID. Accepts either a raw @@ -1324,10 +1372,11 @@ export class Ad4mModel { actions, this._baseExpression, value.map((v) => ({ name: "value", value: this.resolveRelationId(v) })), - batchId + batchId, + this.graphIri ); } else { - await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId); + await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId, this.graphIri); } } } @@ -1346,11 +1395,11 @@ export class Ad4mModel { if (Array.isArray(value)) { await Promise.all( value.map((v) => - this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(v) }], batchId) + this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(v) }], batchId, this.graphIri) ) ); } else { - await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId); + await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId, this.graphIri); } } } @@ -1369,11 +1418,11 @@ export class Ad4mModel { if (Array.isArray(value)) { await Promise.all( value.map((v) => - this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(v) }], batchId) + this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(v) }], batchId, this.graphIri) ) ); } else { - await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId); + await this._perspective.executeAction(actions, this._baseExpression, [{ name: "value", value: this.resolveRelationId(value) }], batchId, this.graphIri); } } } @@ -1470,7 +1519,8 @@ export class Ad4mModel { className, this._baseExpression, initialValues, - batchId + batchId, + this.graphIri ); } @@ -1754,6 +1804,15 @@ export class Ad4mModel { */ async delete(batchId?: string) { const metadata = (this.constructor as typeof Ad4mModel).getModelMetadata(); + + // Fast path: graph-rooted models can drop the entire named graph. + // The executor's removeGraph handles cross-graph reference cleanup atomically + // (removes incoming links from other graphs that target subjects in this graph). + if (metadata.graph && this.graphIri) { + await this._perspective.removeGraph(this.graphIri); + return; + } + const hasDestructor = Object.values(metadata.properties).some( (p) => p.required || p.flag || p.initial !== undefined ); @@ -1851,6 +1910,29 @@ export class Ad4mModel { const instance = new this(perspective) as T; Object.assign(instance, data); + // Resolve the graph IRI for this instance's links. + // Priority: model is graph-rooted → own graph; parent is graph-rooted → parent's graph. + const metadata = (this as typeof Ad4mModel).getModelMetadata(); + if (metadata.graph) { + // This model roots its own graph — use own base expression + instance._resolvedGraphIri = Ad4mModel.graphIriFor(instance._baseExpression); + } else if (options?.parent && 'model' in options.parent) { + // Check if the parent model is graph-rooted + const parentMeta = (options.parent.model as typeof Ad4mModel).getModelMetadata?.(); + if (parentMeta?.graph) { + instance._resolvedGraphIri = Ad4mModel.graphIriFor(options.parent.id); + } + } + + // Resolve the graph for the parent→child link (uses PARENT's graph, not child's). + let parentGraphIri: string | undefined; + if (options?.parent && 'model' in options.parent) { + const parentMeta = (options.parent.model as typeof Ad4mModel).getModelMetadata?.(); + if (parentMeta?.graph) { + parentGraphIri = Ad4mModel.graphIriFor(options.parent.id); + } + } + // When a parent scope is provided without a caller-supplied batch, open a // new batch ourselves so that the instance creation and the parent→child // link are committed atomically. If either step throws, commitBatch is @@ -1864,7 +1946,7 @@ export class Ad4mModel { predicate, target: instance.id, }); - await perspective.add(link, 'shared', batchId); + await perspective.add(link, 'shared', batchId, parentGraphIri); await perspective.commitBatch(batchId); // Hydrate the instance now that the batch has been committed (mirrors the // behaviour of save() when it manages its own batch). @@ -1882,7 +1964,7 @@ export class Ad4mModel { predicate, target: instance.id, }); - await perspective.add(link, 'shared', options?.batchId); + await perspective.add(link, 'shared', options?.batchId, parentGraphIri); } return instance; diff --git a/core/src/model/decorators.ts b/core/src/model/decorators.ts index 5452aba16..829cc4fd4 100644 --- a/core/src/model/decorators.ts +++ b/core/src/model/decorators.ts @@ -470,6 +470,11 @@ export interface ModelConfig { * The name of the entity. */ name: string; + /** + * When true, instances are stored in a named graph keyed by their base expression. + * This enables efficient bulk deletion (drop entire graph) and scoped queries. + */ + graph?: boolean; } /** @@ -530,6 +535,9 @@ export function Model(opts: ModelConfig) { return function (target: any) { target.prototype.className = opts.name; target.className = opts.name; + if (opts.graph) { + target._graphRooted = true; + } target.generateSDNA = function() { return buildSDNA( diff --git a/core/src/model/relation-filtering.test.ts b/core/src/model/relation-filtering.test.ts index d966fe39a..4c6d66ffa 100644 --- a/core/src/model/relation-filtering.test.ts +++ b/core/src/model/relation-filtering.test.ts @@ -763,7 +763,7 @@ describe("compileWhereClause()", () => { it("should compile simple equality to SPARQL condition", () => { const conditions = compileWhereClause( { status: "active" }, - { properties: { status: { name: "status", predicate: "test://status", required: false, readOnly: false } }, relations: {}, className: "Test" } + { properties: { status: { name: "status", predicate: "test://status", required: false, readOnly: false } }, relations: {}, className: "Test", graph: false } ); expect(conditions).toHaveLength(1); expect(conditions[0]).toContain("test://status"); @@ -773,7 +773,7 @@ describe("compileWhereClause()", () => { it("should compile not operator", () => { const conditions = compileWhereClause( { status: { not: "archived" } }, - { properties: { status: { name: "status", predicate: "test://status", required: false, readOnly: false } }, relations: {}, className: "Test" } + { properties: { status: { name: "status", predicate: "test://status", required: false, readOnly: false } }, relations: {}, className: "Test", graph: false } ); expect(conditions).toHaveLength(1); expect(conditions[0]).toContain("NOT EXISTS"); // negation @@ -783,7 +783,7 @@ describe("compileWhereClause()", () => { it("should compile array values to IN clause", () => { const conditions = compileWhereClause( { category: ["food", "drink"] }, - { properties: { category: { name: "category", predicate: "test://category", required: false, readOnly: false } }, relations: {}, className: "Test" } + { properties: { category: { name: "category", predicate: "test://category", required: false, readOnly: false } }, relations: {}, className: "Test", graph: false } ); expect(conditions).toHaveLength(1); expect(conditions[0]).toContain("food"); diff --git a/core/src/model/shacl-gen.ts b/core/src/model/shacl-gen.ts index 2f2f93bc1..4db424aec 100644 --- a/core/src/model/shacl-gen.ts +++ b/core/src/model/shacl-gen.ts @@ -82,6 +82,11 @@ export function buildSHACL( } } + // Set graph-rooting flag from @Model({ graph: true }) + if (target._graphRooted) { + shape.hasGraph = true; + } + // ── Constructor / Destructor actions ──────────────────────────────── let constructorActions: any[] = []; if (obj.subjectConstructor && obj.subjectConstructor.length) { diff --git a/core/src/model/types.ts b/core/src/model/types.ts index be5d2e279..2ca864ae7 100644 --- a/core/src/model/types.ts +++ b/core/src/model/types.ts @@ -239,4 +239,6 @@ export interface ModelMetadata { properties: Record; /** Map of relation name to metadata */ relations: Record; + /** Whether instances are stored in named graphs */ + graph: boolean; } diff --git a/core/src/perspectives/PerspectiveClient.test.ts b/core/src/perspectives/PerspectiveClient.test.ts index 85d31c016..d2d43c5cb 100644 --- a/core/src/perspectives/PerspectiveClient.test.ts +++ b/core/src/perspectives/PerspectiveClient.test.ts @@ -179,6 +179,7 @@ describe('PerspectiveClient RPC operations', () => { link: { source: 'a', target: 'b', predicate: 'c' }, status: 'shared', batchId: undefined, + graph: null, }) }) diff --git a/core/src/perspectives/PerspectiveClient.ts b/core/src/perspectives/PerspectiveClient.ts index 1a7c65502..1302ee663 100644 --- a/core/src/perspectives/PerspectiveClient.ts +++ b/core/src/perspectives/PerspectiveClient.ts @@ -116,11 +116,15 @@ export class PerspectiveClient { return JSON.parse(result) } - async querySparql(uuid: string, query: string): Promise { - const result = await this.#apiClient.call('perspective.querySparql', { uuid, engine: 'sparql', query }) + async querySparql(uuid: string, query: string, graphs?: string[]): Promise { + const result = await this.#apiClient.call('perspective.querySparql', { uuid, engine: 'sparql', query, graphs: graphs || null }) return JSON.parse(result) } + async namedGraphs(uuid: string): Promise { + return this.#apiClient.call('perspective.namedGraphs', { uuid }) + } + async subscribeQuery(uuid: string, query: string): Promise<{ subscriptionId: string, result: AllInstancesResult }> { const response = await this.#apiClient.call<{ subscriptionId: string, result: unknown }>( 'perspective.subscribeQuery', { uuid, query } @@ -176,9 +180,9 @@ export class PerspectiveClient { ) } - async modelQuery(uuid: string, className: string, queryJson: string, shapeJson?: string): Promise { + async modelQuery(uuid: string, className: string, queryJson: string, shapeJson?: string, graphIris?: string[]): Promise { const resultJson = await this.#apiClient.call( - 'perspective.modelQuery', { uuid, class_name: className, query_json: queryJson, shape_json: shapeJson } + 'perspective.modelQuery', { uuid, class_name: className, query_json: queryJson, shape_json: shapeJson, graph_iris: graphIris || null } ) return JSON.parse(resultJson) } @@ -202,9 +206,9 @@ export class PerspectiveClient { return JSON.parse(resultJson) } - async modelSubscribe(uuid: string, className: string, queryJson: string, shapeJson?: string): Promise<{ subscriptionId: string, result: any }> { + async modelSubscribe(uuid: string, className: string, queryJson: string, shapeJson?: string, graphIris?: string[]): Promise<{ subscriptionId: string, result: any }> { const response = await this.#apiClient.call<{ subscription_id: string, result: string }>( - 'perspective.modelSubscribe', { uuid, class_name: className, query_json: queryJson, shape_json: shapeJson } + 'perspective.modelSubscribe', { uuid, class_name: className, query_json: queryJson, shape_json: shapeJson, graph_iris: graphIris || null } ) return { subscriptionId: response.subscription_id, @@ -227,15 +231,15 @@ export class PerspectiveClient { return { perspectiveRemove: result } } - async addLink(uuid: string, link: Link, status: LinkStatus = 'shared', batchId?: string): Promise { + async addLink(uuid: string, link: Link, status: LinkStatus = 'shared', batchId?: string, graph?: string): Promise { return this.#apiClient.call( - 'perspective.addLink', { uuid, link, status, batchId } + 'perspective.addLink', { uuid, link, status, batchId, graph: graph || null } ) } - async addLinks(uuid: string, links: Link[], status: LinkStatus = 'shared', batchId?: string): Promise { + async addLinks(uuid: string, links: Link[], status: LinkStatus = 'shared', batchId?: string, graph?: string): Promise { return this.#apiClient.call( - 'perspective.addLinks', { uuid, links, status, batchId } + 'perspective.addLinks', { uuid, links, status, batchId, graph: graph || null } ) } @@ -285,15 +289,21 @@ export class PerspectiveClient { ) } - async executeCommands(uuid: string, commands: string, expression: string, parameters: string, batchId?: string): Promise { + async executeCommands(uuid: string, commands: string, expression: string, parameters: string, batchId?: string, graph?: string): Promise { + return this.#apiClient.call( + 'perspective.executeCommands', { uuid, commands, expression, parameters, batchId, graph: graph || null } + ) + } + + async removeNamedGraph(uuid: string, graphIri: string): Promise { return this.#apiClient.call( - 'perspective.executeCommands', { uuid, commands, expression, parameters, batchId } + 'perspective.removeNamedGraph', { uuid, graphIri } ) } - async createSubject(uuid: string, subjectClass: string, expressionAddress: string, initialValues?: string, batchId?: string): Promise { + async createSubject(uuid: string, subjectClass: string, expressionAddress: string, initialValues?: string, batchId?: string, graph?: string): Promise { return this.#apiClient.call( - 'perspective.createSubject', { uuid, subjectClass, expressionAddress, initialValues, batchId } + 'perspective.createSubject', { uuid, subjectClass, expressionAddress, initialValues, batchId, graph: graph || null } ) } diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index 97f6b91f2..09f58f4ec 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -515,12 +515,28 @@ export class PerspectiveProxy { * @param parameters - Optional parameters that replace "value" in actions * @param batchId - Optional batch ID to group this operation with others */ - async executeAction(actions, expression, parameters: Parameter[], batchId?: string) { - const result = await this.#client.executeCommands(this.#handle.uuid, JSON.stringify(actions), expression, JSON.stringify(parameters), batchId) + async executeAction(actions, expression, parameters: Parameter[], batchId?: string, graph?: string) { + const result = await this.#client.executeCommands(this.#handle.uuid, JSON.stringify(actions), expression, JSON.stringify(parameters), batchId, graph) invalidatePerspectiveCache(this.#handle.uuid); return result } + /** + * List all named graph IRIs in this perspective. + */ + async graphs(): Promise { + return await this.#client.namedGraphs(this.#handle.uuid); + } + + /** + * Remove a named graph and all its quads. + */ + async removeGraph(graphIri: string): Promise { + const result = await this.#client.removeNamedGraph(this.#handle.uuid, graphIri); + invalidatePerspectiveCache(this.#handle.uuid); + return result; + } + /** * Retrieves links from the perspective that match the given query. * @@ -589,10 +605,10 @@ export class PerspectiveProxy { * @param query - SPARQL query string * @returns Query results as parsed JSON */ - async querySparql(query: string): Promise { + async querySparql(query: string, graphs?: string[]): Promise { const cached = getCachedResult(this.#handle.uuid, query); if (cached !== undefined) return cached; - const result = await this.#client.querySparql(this.#handle.uuid, query); + const result = await this.#client.querySparql(this.#handle.uuid, query, graphs); setCachedResult(this.#handle.uuid, query, result); return result; } @@ -605,8 +621,8 @@ export class PerspectiveProxy { * @param shapeJson - Optional shape metadata JSON from the model class * @returns Object with `instances` array and `totalCount` */ - async modelQuery(className: string, queryJson: string, shapeJson?: string): Promise<{ instances: any[], totalCount: number }> { - return await this.#client.modelQuery(this.#handle.uuid, className, queryJson, shapeJson); + async modelQuery(className: string, queryJson: string, shapeJson?: string, graphIris?: string[]): Promise<{ instances: any[], totalCount: number }> { + return await this.#client.modelQuery(this.#handle.uuid, className, queryJson, shapeJson, graphIris); } /** @@ -646,8 +662,8 @@ export class PerspectiveProxy { * @param shapeJson - Optional shape metadata JSON from the model class * @returns Object with `subscriptionId` and initial `result` */ - async modelSubscribe(className: string, queryJson: string, shapeJson?: string): Promise<{ subscriptionId: string, result: any }> { - return await this.#client.modelSubscribe(this.#handle.uuid, className, queryJson, shapeJson); + async modelSubscribe(className: string, queryJson: string, shapeJson?: string, graphIris?: string[]): Promise<{ subscriptionId: string, result: any }> { + return await this.#client.modelSubscribe(this.#handle.uuid, className, queryJson, shapeJson, graphIris); } /** @@ -675,8 +691,8 @@ export class PerspectiveProxy { * }, "local"); * ``` */ - async add(link: Link, status: LinkStatus = 'shared', batchId?: string): Promise { - const result = await this.#client.addLink(this.#handle.uuid, link, status, batchId) + async add(link: Link, status: LinkStatus = 'shared', batchId?: string, graph?: string): Promise { + const result = await this.#client.addLink(this.#handle.uuid, link, status, batchId, graph) invalidatePerspectiveCache(this.#handle.uuid); return result; } @@ -690,8 +706,8 @@ export class PerspectiveProxy { * @param batchId - Optional batch ID to group this operation with others * @returns Array of created LinkExpressions */ - async addLinks(links: Link[], status: LinkStatus = 'shared', batchId?: string): Promise { - const result = await this.#client.addLinks(this.#handle.uuid, links, status, batchId) + async addLinks(links: Link[], status: LinkStatus = 'shared', batchId?: string, graph?: string): Promise { + const result = await this.#client.addLinks(this.#handle.uuid, links, status, batchId, graph) invalidatePerspectiveCache(this.#handle.uuid); return result; } @@ -1500,7 +1516,8 @@ export class PerspectiveProxy { subjectClass: T, exprAddr: string, initialValues?: Record, - batchId?: B + batchId?: B, + graph?: string ): Promise { let className: string; @@ -1514,7 +1531,8 @@ export class PerspectiveProxy { }), exprAddr, initialValues ? JSON.stringify(initialValues) : undefined, - batchId + batchId, + graph ); } else { const obj = subjectClass as any; @@ -1531,7 +1549,8 @@ export class PerspectiveProxy { }), exprAddr, initialValues ? JSON.stringify(initialValues) : undefined, - batchId + batchId, + graph ); } diff --git a/core/src/shacl/SHACLShape.ts b/core/src/shacl/SHACLShape.ts index aa9681f7f..e758bd601 100644 --- a/core/src/shacl/SHACLShape.ts +++ b/core/src/shacl/SHACLShape.ts @@ -204,6 +204,9 @@ export class SHACLShape { /** Parent shape URIs for model inheritance (sh:node references) */ parentShapes: string[]; + /** Whether instances are stored in named graphs */ + hasGraph: boolean; + /** * Create a new SHACL Shape * @param targetClassOrShapeUri - If one argument: the target class (shape URI auto-derived as {class}Shape) @@ -225,6 +228,7 @@ export class SHACLShape { } this.properties = []; this.parentShapes = []; + this.hasGraph = false; } /** @@ -280,6 +284,11 @@ export class SHACLShape { for (const parentUri of this.parentShapes) { turtle += ` sh:node <${parentUri}> ;\n`; } + + // Emit AD4M graph-rooting flag + if (this.hasGraph) { + turtle += ` ad4m:hasGraph true ;\n`; + } // Add property shapes for (let i = 0; i < this.properties.length; i++) { diff --git a/rust-executor/src/api/agent_ws.rs b/rust-executor/src/api/agent_ws.rs index 50a90635b..1785fc195 100644 --- a/rust-executor/src/api/agent_ws.rs +++ b/rust-executor/src/api/agent_ws.rs @@ -36,6 +36,7 @@ fn link_expression_input_to_decorated(lei: &LinkExpressionInput) -> DecoratedLin invalid: lei.proof.invalid, }, status: lei.status.clone(), + graph: None, } } diff --git a/rust-executor/src/api/perspectives_ws.rs b/rust-executor/src/api/perspectives_ws.rs index 7a7ee8794..64c1e77f2 100644 --- a/rust-executor/src/api/perspectives_ws.rs +++ b/rust-executor/src/api/perspectives_ws.rs @@ -299,7 +299,13 @@ async fn add_link(params: Value, ctx: Arc) -> Result) -> Result = body.links.into_iter().map(Link::from).collect(); - let mutations = LinkMutations { - additions: body.links, - removals: vec![], - }; - - let diff = perspective - .link_mutations(mutations, status, &agent_context) + let results = perspective + .add_links(links, status, body.batch_id, &agent_context, body.graph) .await .map_err(|e| WsRpcError::internal(e.to_string()))?; - let count = diff.additions.len(); + let count = results.len(); if count > 0 { if let Err(e) = reserve_credits(&ctx.user_email, count as f64 * DEFAULT_LINK_WRITE) { log::warn!( @@ -349,7 +351,7 @@ async fn add_links_bulk(params: Value, ctx: Arc) -> Result) -> Result { @@ -546,6 +548,10 @@ async fn query_sparql(params: Value, ctx: Arc) -> Result> = params + .as_object() + .and_then(|o| o.get("graphs")) + .and_then(|v| serde_json::from_value(v.clone()).ok()); let perspective = get_perspective_with_access(&uuid, &ctx).await?; match engine.as_str() { @@ -554,7 +560,7 @@ async fn query_sparql(params: Value, ctx: Arc) -> Result) -> Result) -> Result { + let uuid = params.require_str("uuid")?; + check_capability( + &ctx.capabilities, + &perspective_query_capability(vec![uuid.clone()]), + ) + .map_err(|e| WsRpcError::forbidden(e))?; + + let perspective = get_perspective_with_access(&uuid, &ctx).await?; + let graphs = perspective + .named_graphs() + .map_err(|e| WsRpcError::internal(e.to_string()))?; + + Ok(serde_json::to_value(graphs)?) +} + +async fn remove_named_graph(params: Value, ctx: Arc) -> Result { + let uuid = params.require_str("uuid")?; + check_capability( + &ctx.capabilities, + &perspective_update_capability(vec![uuid.clone()]), + ) + .map_err(|e| WsRpcError::forbidden(e))?; + + let graph_iri = params.require_str("graphIri")?; + let perspective = get_perspective_with_access(&uuid, &ctx).await?; + perspective + .remove_graph(&graph_iri) + .map_err(|e| WsRpcError::internal(e.to_string()))?; + + Ok(Value::Bool(true)) +} + async fn add_sdna(params: Value, ctx: Arc) -> Result { let uuid = params.require_str("uuid")?; check_capability( @@ -680,6 +719,7 @@ async fn execute_commands(params: Value, ctx: Arc) -> Result) -> Result) -> Result< let class_name = params.require_str("class_name")?; let query_json = params.require_str("query_json")?; let shape_json = params.opt_str("shape_json"); + let graph_iris: Option> = params + .as_object() + .and_then(|o| o.get("graphIris")) + .and_then(|v| serde_json::from_value(v.clone()).ok()); let perspective = get_perspective_with_access(&uuid, &ctx).await?; @@ -907,7 +952,7 @@ async fn model_query_handler(params: Value, ctx: Arc) -> Result< let result = tokio::time::timeout( Duration::from_secs(SPARQL_QUERY_TIMEOUT_SECS), tokio::task::spawn_blocking(move || { - perspective.model_query(&class_name, &query_json, shape_json.as_deref()) + perspective.model_query(&class_name, &query_json, shape_json.as_deref(), graph_iris.as_deref()) }), ) .await; @@ -1012,12 +1057,16 @@ async fn model_subscribe_handler( let class_name = params.require_str("class_name")?; let query_json = params.require_str("query_json")?; let shape_json = params.opt_str("shape_json"); + let graph_iris: Option> = params + .as_object() + .and_then(|o| o.get("graphIris")) + .and_then(|v| serde_json::from_value(v.clone()).ok()); let perspective = get_perspective_with_access(&uuid, &ctx).await?; let user_email = ctx.user_email.clone(); let (subscription_id, result_string) = perspective - .model_subscribe_and_query(class_name, query_json, shape_json, user_email) + .model_subscribe_and_query(class_name, query_json, shape_json, user_email, graph_iris) .await .map_err(|e| WsRpcError::internal(e.to_string()))?; @@ -1062,4 +1111,6 @@ pub fn register_ws_handlers(map: &mut HandlerMap) { map.register("perspective.modelQuery", model_query_handler); map.register("perspective.modelSubscribe", model_subscribe_handler); map.register("perspective.evaluateGetters", evaluate_getters_handler); + map.register("perspective.namedGraphs", named_graphs); + map.register("perspective.removeNamedGraph", remove_named_graph); } diff --git a/rust-executor/src/api/types.rs b/rust-executor/src/api/types.rs index d2b6418b4..49de46882 100644 --- a/rust-executor/src/api/types.rs +++ b/rust-executor/src/api/types.rs @@ -450,6 +450,7 @@ pub struct ExecuteCommandsRequest { pub expression: String, pub parameters: Option, pub batch_id: Option, + pub graph: Option, } // ── Hosting ── @@ -509,6 +510,7 @@ pub struct AddLinkRequest { pub link: LinkInput, pub status: Option, pub batch_id: Option, + pub graph: Option, } #[derive(Deserialize, TS)] @@ -518,6 +520,7 @@ pub struct AddLinksBulkRequest { pub links: Vec, pub status: Option, pub batch_id: Option, + pub graph: Option, } #[derive(Deserialize, TS)] @@ -606,6 +609,7 @@ pub struct CreateSubjectRequest { pub expression_address: String, pub initial_values: Option, pub batch_id: Option, + pub graph: Option, } #[derive(Deserialize, TS)] diff --git a/rust-executor/src/db.rs b/rust-executor/src/db.rs index ff75498a7..a907e59f1 100644 --- a/rust-executor/src/db.rs +++ b/rust-executor/src/db.rs @@ -1434,6 +1434,7 @@ impl Ad4mDb { author: row.get(4)?, timestamp: row.get(5)?, status: Some(status.clone()), + graph: None, }; Ok((link, status)) @@ -1472,6 +1473,7 @@ impl Ad4mDb { author: row.get(4)?, timestamp: row.get(5)?, status: Some(status.clone()), + graph: None, }; Ok((link_expression, status)) })?; @@ -1509,6 +1511,7 @@ impl Ad4mDb { author: row.get(4)?, timestamp: row.get(5)?, status: Some(status.clone()), + graph: None, }; Ok((link_expression, status)) })?; @@ -1546,6 +1549,7 @@ impl Ad4mDb { author: row.get(4)?, timestamp: row.get(5)?, status: Some(status.clone()), + graph: None, }; Ok((link_expression, status)) })?; @@ -1584,6 +1588,7 @@ impl Ad4mDb { author: row.get(4)?, timestamp: row.get(5)?, status: Some(status.clone()), + graph: None, }; Ok((link_expression, status)) })?; @@ -3946,6 +3951,7 @@ mod tests { author: "did:test:key".to_string(), timestamp: Utc::now().to_rfc3339(), status: Some(status), + graph: None, } } diff --git a/rust-executor/src/mcp/tools/children.rs b/rust-executor/src/mcp/tools/children.rs index 11eae5f99..89e6f1644 100644 --- a/rust-executor/src/mcp/tools/children.rs +++ b/rust-executor/src/mcp/tools/children.rs @@ -179,7 +179,7 @@ impl Ad4mMcpHandler { }; match perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { Ok(decorated) => { diff --git a/rust-executor/src/mcp/tools/dynamic.rs b/rust-executor/src/mcp/tools/dynamic.rs index adce2eba5..b3b8c2bd0 100644 --- a/rust-executor/src/mcp/tools/dynamic.rs +++ b/rust-executor/src/mcp/tools/dynamic.rs @@ -485,6 +485,7 @@ impl Ad4mMcpHandler { initial_values, None, &agent_context, + None, ) .await { @@ -511,7 +512,7 @@ impl Ad4mMcpHandler { }; if let Err(e) = perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { return serde_json::to_string_pretty(&json!({ @@ -1050,6 +1051,7 @@ impl Ad4mMcpHandler { LinkStatus::Shared, Some(batch_id.clone()), &agent_context, + None, ) .await { @@ -1214,6 +1216,7 @@ impl Ad4mMcpHandler { LinkStatus::Shared, Some(batch_id.clone()), &agent_context, + None, ) .await { @@ -1336,7 +1339,7 @@ impl Ad4mMcpHandler { }; match perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { Ok(_) => serde_json::to_string_pretty(&json!({ diff --git a/rust-executor/src/mcp/tools/perspectives.rs b/rust-executor/src/mcp/tools/perspectives.rs index 514cf54de..0023d748e 100644 --- a/rust-executor/src/mcp/tools/perspectives.rs +++ b/rust-executor/src/mcp/tools/perspectives.rs @@ -221,7 +221,7 @@ impl Ad4mMcpHandler { }; match perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { Ok(decorated) => { diff --git a/rust-executor/src/mcp/tools/profiles.rs b/rust-executor/src/mcp/tools/profiles.rs index aa3cf778c..cbdc52228 100644 --- a/rust-executor/src/mcp/tools/profiles.rs +++ b/rust-executor/src/mcp/tools/profiles.rs @@ -158,6 +158,7 @@ fn make_profile_link( data: verified.data, proof: verified.proof, status: None, + graph: None, }) } diff --git a/rust-executor/src/mcp/tools/subjects.rs b/rust-executor/src/mcp/tools/subjects.rs index 191a153cc..95e866c4c 100644 --- a/rust-executor/src/mcp/tools/subjects.rs +++ b/rust-executor/src/mcp/tools/subjects.rs @@ -403,6 +403,7 @@ impl Ad4mMcpHandler { initial_values, None, &agent_context, + None, ) .await { @@ -452,6 +453,7 @@ impl Ad4mMcpHandler { parameters, None, &agent_context, + None, ) .await { @@ -523,7 +525,7 @@ impl Ad4mMcpHandler { }; match perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { Ok(_) => serde_json::to_string_pretty(&json!({ @@ -613,7 +615,7 @@ impl Ad4mMcpHandler { }; match perspective - .add_link(link, LinkStatus::Shared, None, &agent_context) + .add_link(link, LinkStatus::Shared, None, &agent_context, None) .await { Ok(_) => serde_json::to_string_pretty(&json!({ diff --git a/rust-executor/src/perspectives/migration.rs b/rust-executor/src/perspectives/migration.rs index f8fc39bd2..90b6423a2 100644 --- a/rust-executor/src/perspectives/migration.rs +++ b/rust-executor/src/perspectives/migration.rs @@ -145,6 +145,7 @@ pub fn migrate_links_from_rusqlite_to_sparql( invalid: None, }, status: Some(status.clone()), + graph: None, }; // Convert literal:// → literal: in all URI fields @@ -316,6 +317,7 @@ mod tests { invalid: None, }, status: None, + graph: None, }; let conversions = convert_link_literal_uris(&mut link); @@ -342,6 +344,7 @@ mod tests { invalid: None, }, status: None, + graph: None, }; let conversions = convert_link_literal_uris(&mut link); @@ -368,6 +371,7 @@ mod tests { invalid: None, }, status: None, + graph: None, }; let conversions = convert_link_literal_uris(&mut link); @@ -394,6 +398,7 @@ mod tests { invalid: None, }, status: None, + graph: None, }; let conversions = convert_link_literal_uris(&mut link); @@ -455,6 +460,7 @@ mod tests { key: "key1".to_string(), }, status: Some(LinkStatus::Local), + graph: None, }; let link2 = LinkExpression { @@ -470,6 +476,7 @@ mod tests { key: "key2".to_string(), }, status: Some(LinkStatus::Local), + graph: None, }; Ad4mDb::with_global_instance(|db| { @@ -538,6 +545,7 @@ mod tests { key: "key".to_string(), }, status: Some(LinkStatus::Shared), + graph: None, }; let link_already_canonical = LinkExpression { @@ -553,6 +561,7 @@ mod tests { key: "key2".to_string(), }, status: Some(LinkStatus::Shared), + graph: None, }; Ad4mDb::with_global_instance(|db| { @@ -623,6 +632,7 @@ mod tests { key: "key".to_string(), }, status: Some(LinkStatus::Local), + graph: None, }; Ad4mDb::with_global_instance(|db| { diff --git a/rust-executor/src/perspectives/mod.rs b/rust-executor/src/perspectives/mod.rs index fa1021024..85bcd0eaa 100644 --- a/rust-executor/src/perspectives/mod.rs +++ b/rust-executor/src/perspectives/mod.rs @@ -798,6 +798,7 @@ mod tests { signature: "test-signature".to_string(), }, status: Some(LinkStatus::Local), + graph: None, }; println!("test_link: {:?}", test_link); diff --git a/rust-executor/src/perspectives/model_query/getters.rs b/rust-executor/src/perspectives/model_query/getters.rs index 1f566b6a4..aee8af74e 100644 --- a/rust-executor/src/perspectives/model_query/getters.rs +++ b/rust-executor/src/perspectives/model_query/getters.rs @@ -78,7 +78,7 @@ pub fn evaluate_getters_batch( }) .collect(); - evaluate_getters(store, &mut instances, &shape, None, true)?; + evaluate_getters(store, &mut instances, &shape, None, true, None)?; let mut result = Map::new(); for inst in &instances { @@ -180,6 +180,7 @@ pub(super) fn evaluate_getters( shape: &ModelShape, _include: Option<&HashMap>, deep_query: bool, + graph_iris: Option<&[String]>, ) -> Result<(), Error> { let getter_props: Vec<&ShapeProperty> = shape .properties @@ -219,7 +220,7 @@ pub(super) fn evaluate_getters( if upper.starts_with("ASK") { let batched = convert_ask_to_batched_select(getter, &values_clause); - match store.query(&batched) { + match store.query_with_graphs(&batched, graph_iris) { Ok(result_json) => { let rows: Vec = serde_json::from_str(&result_json).unwrap_or_default(); let matched: std::collections::HashSet<&str> = rows @@ -248,7 +249,7 @@ pub(super) fn evaluate_getters( } else if upper.starts_with("SELECT") { let batched = inject_values_into_select(getter, &values_clause); - match store.query(&batched) { + match store.query_with_graphs(&batched, graph_iris) { Ok(result_json) => { let rows: Vec = serde_json::from_str(&result_json).unwrap_or_default(); diff --git a/rust-executor/src/perspectives/model_query/integration_tests.rs b/rust-executor/src/perspectives/model_query/integration_tests.rs index 837f9a492..70c5afa38 100644 --- a/rust-executor/src/perspectives/model_query/integration_tests.rs +++ b/rust-executor/src/perspectives/model_query/integration_tests.rs @@ -32,6 +32,7 @@ fn make_link(source: &str, predicate: &str, target: &str, ts: &str) -> Decorated invalid: Some(false), }, status: None, + graph: None, } } @@ -77,7 +78,7 @@ fn test_full_model_query_with_where_filter() { // Query without WHERE - should find 1 instance let query_no_where = ModelQueryInput::default(); - let result = execute_model_query(&store, "Recipe", &query_no_where, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Recipe", &query_no_where, Some(shape_json), None).unwrap(); assert_eq!( result.instances.len(), 1, @@ -99,7 +100,7 @@ fn test_full_model_query_with_where_filter() { ..Default::default() }; let result2 = - execute_model_query(&store, "Recipe", &query_with_where, Some(shape_json)).unwrap(); + execute_model_query(&store, "Recipe", &query_with_where, Some(shape_json), None).unwrap(); assert_eq!( result2.instances.len(), 1, @@ -162,7 +163,7 @@ fn test_where_clause_raw_uri_property() { where_clause: Some(where_clause), ..Default::default() }; - let result = execute_model_query(&store, "Todo", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Todo", &query, Some(shape_json), None).unwrap(); assert_eq!( result.instances.len(), 1, @@ -228,7 +229,7 @@ fn test_where_clause_literal_prop_with_raw_uri_value() { where_clause: Some(where_clause), ..Default::default() }; - let result = execute_model_query(&store, "Todo", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Todo", &query, Some(shape_json), None).unwrap(); assert_eq!( result.instances.len(), 1, @@ -374,7 +375,7 @@ fn test_shared_predicate_relations_all_populated_via_store() { }"#; let query = ModelQueryInput::default(); - let result = execute_model_query(&store, "Channel", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Channel", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 1, "Should find 1 channel"); @@ -498,7 +499,7 @@ fn test_shared_predicate_with_unique_predicates_no_cross_contamination() { }"#; let query = ModelQueryInput::default(); - let result = execute_model_query(&store, "Parent", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Parent", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 1); let inst = &result.instances[0]; @@ -643,6 +644,7 @@ fn make_shape_with_relation(class: &str, rel_name: &str, predicate: &str) -> Mod where_predicates: None, }], include_relations: vec![], + has_graph: false, } } @@ -1004,6 +1006,7 @@ fn test_evaluate_getters_batch_returns_results() { &["test://inst-1".to_string()], None, Some(shape_json), + None, ) .unwrap(); @@ -1053,6 +1056,7 @@ fn test_evaluate_getters_batch_filters_by_property_names() { &["test://inst-1".to_string()], Some(&["propA".to_string()]), Some(shape_json), + None, ) .unwrap(); @@ -1184,10 +1188,11 @@ fn test_evaluate_getters_where_compiled_literal_filter() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![serde_json::json!({"id": board})]; - let eval_result = evaluate_getters(&store, &mut instances, &shape, None, true); + let eval_result = evaluate_getters(&store, &mut instances, &shape, None, true, None); assert!( eval_result.is_ok(), "evaluate_getters should succeed: {:?}", @@ -1325,6 +1330,7 @@ fn test_batched_ask_getter_multiple_instances() { &["test://inst-1".to_string(), "test://inst-2".to_string()], None, Some(shape_json), + None, ) .unwrap(); @@ -1369,6 +1375,7 @@ fn test_batched_select_getter_multiple_instances() { &["test://inst-1".to_string(), "test://inst-2".to_string()], None, Some(shape_json), + None, ) .unwrap(); @@ -1433,6 +1440,7 @@ fn test_batched_collection_getter() { &["test://inst-1".to_string(), "test://inst-2".to_string()], None, Some(shape_json), + None, ) .unwrap(); @@ -1488,7 +1496,7 @@ fn test_deep_query_defaults_to_true() { ..Default::default() }; - let result = execute_model_query(&store, "Message", &query_input, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Message", &query_input, Some(shape_json), None).unwrap(); assert!(!result.instances.is_empty(), "should find instance"); let inst = &result.instances[0]; @@ -1540,7 +1548,7 @@ fn test_deep_query_false_skips_property_getters() { ..Default::default() }; - let result = execute_model_query(&store, "Message", &query_input, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Message", &query_input, Some(shape_json), None).unwrap(); assert!(!result.instances.is_empty()); let inst = &result.instances[0]; @@ -1607,7 +1615,7 @@ fn test_getters_run_after_pagination() { ..Default::default() }; - let result = execute_model_query(&store, "Message", &query_input, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Message", &query_input, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 2, "should return 2 instances"); assert_eq!(result.total_count, 5, "total count should be 5"); @@ -1741,10 +1749,11 @@ fn test_where_filter_signed_expression_string() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": board})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let active = instances[0]["activeTasks"].as_array().unwrap(); assert_eq!( @@ -1814,10 +1823,11 @@ fn test_where_filter_signed_expression_no_matches() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": parent})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let result = instances[0]["activeChildren"].as_array().unwrap(); assert_eq!(result.len(), 0, "Should be empty when no matches"); @@ -1935,10 +1945,11 @@ fn test_where_filter_multiple_conditions() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": board})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let result = instances[0]["highPriActive"].as_array().unwrap(); assert_eq!(result.len(), 1, "Only task_hi should match: {:?}", result); @@ -2001,10 +2012,11 @@ fn test_where_filter_missing_property_on_target() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": parent})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let result = instances[0]["active"].as_array().unwrap(); assert_eq!(result.len(), 1, "Only child_with should match"); @@ -2065,10 +2077,11 @@ fn test_where_filter_plain_literal_string() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": parent})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let result = instances[0]["redChildren"].as_array().unwrap(); assert_eq!(result.len(), 1); @@ -2152,10 +2165,11 @@ fn test_where_filter_on_multiple_instances() { where_predicates: Some(where_predicates), }], include_relations: vec![], + has_graph: false, }; let mut instances = vec![json!({"id": board1}), json!({"id": board2})]; - evaluate_getters(&store, &mut instances, &shape, None, true).unwrap(); + evaluate_getters(&store, &mut instances, &shape, None, true, None).unwrap(); let active1 = instances[0]["activeTasks"].as_array().unwrap(); assert_eq!(active1.len(), 1, "board1 should have 1 active task"); @@ -2243,7 +2257,7 @@ fn test_full_model_query_signed_expression_where() { ..Default::default() }; - let result = execute_model_query(&store, "Item", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Item", &query, Some(shape_json), None).unwrap(); assert_eq!( result.instances.len(), 2, @@ -2318,7 +2332,7 @@ fn test_full_model_query_signed_expression_numeric_where() { ..Default::default() }; - let result = execute_model_query(&store, "Item", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Item", &query, Some(shape_json), None).unwrap(); assert_eq!( result.instances.len(), 1, @@ -2369,7 +2383,7 @@ fn test_full_model_query_signed_expression_boolean_where() { ..Default::default() }; - let result = execute_model_query(&store, "Thing", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Thing", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 1); assert_eq!(result.instances[0]["id"].as_str().unwrap(), item1); } @@ -2435,7 +2449,7 @@ fn test_full_model_query_where_string_array_in() { ..Default::default() }; - let result = execute_model_query(&store, "Item", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Item", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 2, "active and pending should match"); } @@ -2494,7 +2508,7 @@ fn test_full_model_query_where_ops_not() { ..Default::default() }; - let result = execute_model_query(&store, "Item", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Item", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 1); assert_eq!(result.instances[0]["id"].as_str().unwrap(), item1); } @@ -2556,6 +2570,7 @@ fn make_shape(props: Vec) -> ModelShape { shape_uri: String::new(), properties: props, include_relations: vec![], + has_graph: false, } } @@ -2785,7 +2800,7 @@ fn test_build_instance_sparql_integration_getter_excluded_from_results() { }"#; let query = ModelQueryInput::default(); - let result = execute_model_query(&store, "Channel", &query, Some(shape_json)).unwrap(); + let result = execute_model_query(&store, "Channel", &query, Some(shape_json), None).unwrap(); assert_eq!(result.instances.len(), 1, "Should find exactly 1 channel"); assert_eq!( @@ -2863,6 +2878,7 @@ fn test_full_model_query_ops_gt_lt_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2, "gt:50 should match 70 and 90"); @@ -2884,6 +2900,7 @@ fn test_full_model_query_ops_gt_lt_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2, "lt:50 should match 10 and 30"); @@ -2905,6 +2922,7 @@ fn test_full_model_query_ops_gt_lt_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!( @@ -2961,6 +2979,7 @@ fn test_full_model_query_ops_gte_lte_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2, "gte:50 should match 50 and 90"); @@ -2982,6 +3001,7 @@ fn test_full_model_query_ops_gte_lte_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2, "lte:50 should match 10 and 50"); @@ -3033,6 +3053,7 @@ fn test_full_model_query_ops_not_string_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2, "NOT done → 2 active tasks"); @@ -3082,6 +3103,7 @@ fn test_full_model_query_ops_not_array_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!( @@ -3141,6 +3163,7 @@ fn test_full_model_query_ops_with_pagination_pushed() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 3, "Should get 3 items in page"); @@ -3191,6 +3214,7 @@ fn test_full_model_query_ops_contains_sparql_push() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!( @@ -3255,6 +3279,7 @@ fn test_full_model_query_ops_contains_with_pagination() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 1, "Should get 1 item in page"); @@ -3351,6 +3376,7 @@ fn test_signed_envelope_where_paginate_count() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); @@ -3388,6 +3414,7 @@ fn test_signed_envelope_where_paginate_count() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); @@ -3474,6 +3501,7 @@ fn test_mixed_plain_and_signed_envelope_where() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); @@ -3498,6 +3526,7 @@ fn test_mixed_plain_and_signed_envelope_where() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); @@ -3573,6 +3602,7 @@ fn test_perf_large_dataset_paginated_query() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); let elapsed = start.elapsed(); @@ -3683,6 +3713,7 @@ fn test_perf_flux_message_parent_scope_paginated() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); let elapsed = start.elapsed(); @@ -3863,6 +3894,7 @@ fn test_full_model_query_order_by_property_string() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2); @@ -3881,6 +3913,7 @@ fn test_full_model_query_order_by_property_string() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 2); @@ -3930,6 +3963,7 @@ fn test_full_model_query_order_by_property_number() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); assert_eq!(result.instances.len(), 3); @@ -3951,6 +3985,7 @@ fn test_full_model_query_order_by_property_number() { ..Default::default() }, Some(shape_json), + None, ) .unwrap(); let got_scores: Vec = result diff --git a/rust-executor/src/perspectives/model_query/projection.rs b/rust-executor/src/perspectives/model_query/projection.rs index 599a668fd..34a53fc71 100644 --- a/rust-executor/src/perspectives/model_query/projection.rs +++ b/rust-executor/src/perspectives/model_query/projection.rs @@ -31,6 +31,7 @@ pub(super) fn resolve_projections( instances: &mut Vec, projections: &HashMap, shape: &ModelShape, + graph_iris: Option<&[String]>, ) -> Result<(), deno_core::anyhow::Error> { if instances.is_empty() || projections.is_empty() { return Ok(()); @@ -104,7 +105,7 @@ pub(super) fn resolve_projections( reifier_patterns = reifier_patterns, ); - let result_json = store.query(&sparql)?; + let result_json = store.query_with_graphs(&sparql, graph_iris)?; let rows: Vec = serde_json::from_str(&result_json)?; let mut count_map: HashMap = HashMap::new(); @@ -148,7 +149,7 @@ pub(super) fn resolve_projections( order_clause = order_clause, ); - let result_json = store.query(&sparql)?; + let result_json = store.query_with_graphs(&sparql, graph_iris)?; let rows: Vec = serde_json::from_str(&result_json)?; let mut list_map: HashMap> = HashMap::new(); diff --git a/rust-executor/src/perspectives/model_query/query.rs b/rust-executor/src/perspectives/model_query/query.rs index 83914f7ef..529ac25fc 100644 --- a/rust-executor/src/perspectives/model_query/query.rs +++ b/rust-executor/src/perspectives/model_query/query.rs @@ -38,8 +38,9 @@ pub fn execute_model_query( class_name: &str, query_input: &ModelQueryInput, shape_json: Option<&str>, + graph_iris: Option<&[String]>, ) -> Result { - execute_model_query_inner(store, class_name, query_input, shape_json, 0) + execute_model_query_inner(store, class_name, query_input, shape_json, 0, graph_iris) } /// Inner implementation with recursion depth tracking. @@ -53,6 +54,7 @@ pub(super) fn execute_model_query_inner( query_input: &ModelQueryInput, shape_json: Option<&str>, depth: u8, + graph_iris: Option<&[String]>, ) -> Result { if depth > MAX_INCLUDE_DEPTH { log::warn!( @@ -76,7 +78,7 @@ pub(super) fn execute_model_query_inner( let is_count_only = query_input.limit == Some(0); if is_count_only && all_where_pushable(query_input, &shape) { if let Some(sparql) = build_count_sparql(&shape, query_input) { - let result_json = store.query(&sparql)?; + let result_json = store.query_with_graphs(&sparql, graph_iris)?; let results: Vec = serde_json::from_str(&result_json)?; let count = results .first() @@ -149,14 +151,14 @@ pub(super) fn execute_model_query_inner( let raw_results: Vec = match query_plan { InstanceQueryPlan::Single(sparql) => { - let result_json = store.query(&sparql)?; + let result_json = store.query_with_graphs(&sparql, graph_iris)?; serde_json::from_str(&result_json)? } InstanceQueryPlan::TwoPhase { pagination_subquery, predicate_filter, } => { - let page_json = store.query(&pagination_subquery)?; + let page_json = store.query_with_graphs(&pagination_subquery, graph_iris)?; let page_results: Vec = serde_json::from_str(&page_json)?; if page_results.is_empty() { @@ -183,7 +185,7 @@ pub(super) fn execute_model_query_inner( ?_reifier ?timestamp . }}"# ); - let result_json = store.query(&property_sparql)?; + let result_json = store.query_with_graphs(&property_sparql, graph_iris)?; serde_json::from_str(&result_json)? } } @@ -201,7 +203,7 @@ pub(super) fn execute_model_query_inner( .map(|p| (p.name.clone(), p.predicate.clone(), p.is_scalar_relation)) .collect(); if !reverse_rels.is_empty() && !instances.is_empty() { - resolve_reverse_relations(store, &mut instances, &reverse_rels)?; + resolve_reverse_relations(store, &mut instances, &reverse_rels, graph_iris)?; } // Apply post-hydration where-clause filters @@ -214,7 +216,7 @@ pub(super) fn execute_model_query_inner( // Calculate total count let total_count = if sparql_pagination.is_some() { if let Some(count_sparql) = build_count_sparql(&shape, query_input) { - let result_json = store.query(&count_sparql)?; + let result_json = store.query_with_graphs(&count_sparql, graph_iris)?; let results: Vec = serde_json::from_str(&result_json)?; results .first() @@ -270,13 +272,14 @@ pub(super) fn execute_model_query_inner( &shape, query_input.include.as_ref(), deep_query, + graph_iris, )?; } // Eager-load included relations if let Some(ref include) = query_input.include { if !paginated.is_empty() && !shape.include_relations.is_empty() { - resolve_includes_recursive(store, &mut paginated, include, &shape, depth)?; + resolve_includes_recursive(store, &mut paginated, include, &shape, depth, graph_iris)?; } } @@ -300,7 +303,7 @@ pub(super) fn execute_model_query_inner( // Attach projection results if let Some(ref projections) = query_input.projections { - resolve_projections(store, &mut final_instances, projections, &shape)?; + resolve_projections(store, &mut final_instances, projections, &shape, graph_iris)?; } Ok(ModelQueryResult { diff --git a/rust-executor/src/perspectives/model_query/relations.rs b/rust-executor/src/perspectives/model_query/relations.rs index b7ff925e3..8c62d8506 100644 --- a/rust-executor/src/perspectives/model_query/relations.rs +++ b/rust-executor/src/perspectives/model_query/relations.rs @@ -33,6 +33,7 @@ pub fn resolve_reverse_relations( store: &SparqlStore, instances: &mut [Value], relations: &[(String, String, bool)], // (name, predicate, is_single) + _graph_iris: Option<&[String]>, ) -> Result<(), Error> { if relations.is_empty() || instances.is_empty() { return Ok(()); @@ -116,6 +117,7 @@ pub(super) fn resolve_includes_recursive( include: &HashMap, shape: &ModelShape, depth: u8, + graph_iris: Option<&[String]>, ) -> Result<(), Error> { for (rel_name, include_val) in include { match include_val { @@ -135,9 +137,9 @@ pub(super) fn resolve_includes_recursive( }; if rel.direction == "reverse" { - resolve_reverse_include(store, instances, rel, &sub_query, depth)?; + resolve_reverse_include(store, instances, rel, &sub_query, depth, graph_iris)?; } else { - resolve_forward_include(store, instances, rel, &sub_query, depth)?; + resolve_forward_include(store, instances, rel, &sub_query, depth, graph_iris)?; } } Ok(()) @@ -154,6 +156,7 @@ fn resolve_forward_include( rel: &ShapeRelation, sub_query: &ModelQueryInput, depth: u8, + graph_iris: Option<&[String]>, ) -> Result<(), Error> { let mut seen = std::collections::HashSet::new(); let mut all_ids: Vec = Vec::new(); @@ -198,6 +201,7 @@ fn resolve_forward_include( &query, Some(&rel.target_shape_json), depth + 1, + graph_iris, )?; let mut hydrated: HashMap = HashMap::new(); @@ -262,6 +266,7 @@ fn resolve_reverse_include( rel: &ShapeRelation, sub_query: &ModelQueryInput, depth: u8, + graph_iris: Option<&[String]>, ) -> Result<(), Error> { let all_ids: Vec = instances .iter() @@ -287,7 +292,7 @@ fn resolve_reverse_include( let sparql = format!( "SELECT ?source ?target WHERE {{ ?source <{safe_pred}> ?target . FILTER(?target IN ({id_list})) }}" ); - let result_json = store.query(&sparql)?; + let result_json = store.query_with_graphs(&sparql, graph_iris)?; let rows: Vec = serde_json::from_str(&result_json)?; let mut sources_by_target: HashMap> = HashMap::new(); @@ -346,6 +351,7 @@ fn resolve_reverse_include( &query, Some(&rel.target_shape_json), depth + 1, + graph_iris, )?; ordered_result_ids = result diff --git a/rust-executor/src/perspectives/model_query/shape.rs b/rust-executor/src/perspectives/model_query/shape.rs index 73c4e7588..46098bbde 100644 --- a/rust-executor/src/perspectives/model_query/shape.rs +++ b/rust-executor/src/perspectives/model_query/shape.rs @@ -165,6 +165,7 @@ pub(super) fn load_shape(store: &SparqlStore, class_name: &str) -> Result Result) -> ModelShape { shape_uri: format!("{class}Shape"), properties, include_relations: Vec::new(), + has_graph: false, } } diff --git a/rust-executor/src/perspectives/model_query/types.rs b/rust-executor/src/perspectives/model_query/types.rs index e55fd41f3..d61411388 100644 --- a/rust-executor/src/perspectives/model_query/types.rs +++ b/rust-executor/src/perspectives/model_query/types.rs @@ -332,6 +332,9 @@ pub(crate) struct ModelShape { /// Enriched relation metadata for include resolution (only populated /// when the TS client sends target shapes for included relations). pub(super) include_relations: Vec, + /// Whether instances of this model are stored in named graphs + #[allow(dead_code)] + pub(super) has_graph: bool, } impl ModelShape { diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 5f35f3fcd..b2e5f2f3a 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -32,6 +32,7 @@ use chrono::DateTime; use deno_core::anyhow::anyhow; use deno_core::error::AnyError; use futures::future; +use oxigraph::model::GraphName; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -54,6 +55,17 @@ enum ChangedPredicates { /// Only specific predicates changed Specific(HashSet), } + +/// Tracks which named graphs have changed since the last subscription check. +#[derive(Debug, Clone)] +enum ChangedGraphs { + /// No graph changes recorded + NoneRecorded, + /// A link with no graph was seen — must check all graph-scoped subs + DefaultGraphChanged, + /// Only specific named graphs changed + Specific(HashSet), +} use uuid; use uuid::Uuid; @@ -194,6 +206,10 @@ struct SubscribedQuery { /// When set, this subscription was registered via `model_subscribe_and_query`. /// On trigger, `execute_model_query` is called instead of re-running raw SPARQL. model_query_params: Option, + /// Named graph IRIs this subscription watches. Derived from the model query's + /// graph_iris (which generates FROM clauses in the query). When set, only changes + /// in these graphs trigger re-evaluation. For raw SPARQL subscriptions, this is None. + graph_scope: Option>, } /// Parameters for a model query subscription. Stored alongside the trigger SPARQL @@ -203,6 +219,7 @@ struct ModelSubscriptionParams { class_name: String, query_json: String, shape_json: Option, + graph_iris: Option>, } /// Extract predicate IRIs from a SPARQL query by finding triple patterns. @@ -251,6 +268,29 @@ fn extract_predicates_from_sparql(query: &str) -> HashSet { predicates } +/// Extract FROM graph IRIs from a SPARQL query's dataset declaration. +/// Returns Some(vec) if the query declares specific default graphs, None otherwise. +fn extract_from_graph_iris(query: &str) -> Option> { + let parsed = oxigraph::sparql::Query::parse(query, None).ok()?; + let dataset = parsed.dataset(); + if dataset.is_default_dataset() { + return None; + } + let graphs: Vec = dataset + .default_graph_graphs()? + .iter() + .filter_map(|g| match g { + GraphName::NamedNode(n) => Some(n.as_str().to_string()), + _ => None, + }) + .collect(); + if graphs.is_empty() { + None + } else { + Some(graphs) + } +} + #[derive(Clone)] pub struct PerspectiveInstance { pub persisted: Arc>, @@ -269,6 +309,8 @@ pub struct PerspectiveInstance { trigger_prolog_subscription_check: Arc, /// Predicates of links changed since last subscription check. changed_predicates: Arc>, + /// Named graphs changed since last subscription check. + changed_graphs: Arc>, commit_debounce_timer: Arc>>, immediate_commits_remaining: Arc>, subscribed_queries: Arc>>, @@ -303,6 +345,7 @@ impl PerspectiveInstance { trigger_notification_check: Arc::new(AtomicBool::new(false)), trigger_prolog_subscription_check: Arc::new(AtomicBool::new(false)), changed_predicates: Arc::new(Mutex::new(ChangedPredicates::NoneRecorded)), + changed_graphs: Arc::new(Mutex::new(ChangedGraphs::NoneRecorded)), commit_debounce_timer: Arc::new(Mutex::new(None)), immediate_commits_remaining: Arc::new(Mutex::new(IMMEDIATE_COMMITS_COUNT)), subscribed_queries: Arc::new(Mutex::new(HashMap::new())), @@ -881,7 +924,7 @@ impl PerspectiveInstance { } pub async fn diff_from_link_language(&self, diff: PerspectiveDiff) -> Result<(), AnyError> { - // Deduplicate by (author, timestamp, source, predicate, target) + // Deduplicate by (author, timestamp, source, predicate, target, graph) // Use structured keys to avoid delimiter collision issues let mut seen_add: std::collections::HashSet = std::collections::HashSet::new(); let mut unique_additions: Vec = Vec::new(); @@ -892,6 +935,7 @@ impl PerspectiveInstance { &link.data.source, link.data.predicate.as_deref().unwrap_or(""), &link.data.target, + link.graph.as_deref().unwrap_or(""), ); let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| { // Fallback to a simple hash if serialization fails @@ -911,6 +955,7 @@ impl PerspectiveInstance { &link.data.source, link.data.predicate.as_deref().unwrap_or(""), &link.data.target, + link.graph.as_deref().unwrap_or(""), ); let key = serde_json::to_string(&key_tuple).unwrap_or_else(|_| { // Fallback to a simple hash if serialization fails @@ -962,12 +1007,15 @@ impl PerspectiveInstance { status: LinkStatus, batch_id: Option, context: &AgentContext, + graph: Option, ) -> Result { if let Some(ref email) = context.user_email { crate::billing::check_compute_credits(email)?; } link.validate()?; - let link_expr: LinkExpression = create_signed_expression(link.normalize(), context)?.into(); + let mut link_expr: LinkExpression = + create_signed_expression(link.normalize(), context)?.into(); + link_expr.graph = graph; let result = self .add_link_expression(link_expr, status, batch_id) .await?; @@ -1185,6 +1233,7 @@ impl PerspectiveInstance { status: LinkStatus, batch_id: Option, context: &AgentContext, + graph: Option, ) -> Result, AnyError> { if let Some(ref email) = context.user_email { crate::billing::check_compute_credits(email)?; @@ -1194,7 +1243,13 @@ impl PerspectiveInstance { } let link_expressions: Result, _> = links .into_iter() - .map(|l| create_signed_expression(l.normalize(), context).map(LinkExpression::from)) + .map(|l| { + create_signed_expression(l.normalize(), context).map(|e| { + let mut le = LinkExpression::from(e); + le.graph = graph.clone(); + le + }) + }) .collect(); let link_expressions = link_expressions?; @@ -1679,6 +1734,7 @@ impl PerspectiveInstance { signature: decorated.proof.signature, }, status: Some(status.clone()), + graph: decorated.graph, }; (link_expr, status) }) @@ -1834,13 +1890,13 @@ impl PerspectiveInstance { target: sdna_code.clone(), }); - self.add_links(sdna_links, LinkStatus::Shared, None, context) + self.add_links(sdna_links, LinkStatus::Shared, None, context, None) .await?; // Handle SHACL links if SHACL JSON provided explicitly if let Some(shacl) = shacl_json { let shacl_links = parse_shacl_to_links(&shacl, &name)?; - self.add_links(shacl_links, LinkStatus::Shared, None, context) + self.add_links(shacl_links, LinkStatus::Shared, None, context, None) .await?; } @@ -2576,6 +2632,52 @@ impl PerspectiveInstance { self.sparql_store.query(&query) } + /// Execute a SPARQL query with optional graph scoping + pub fn sparql_query_with_graphs( + &self, + query: String, + graphs: Option<&[String]>, + ) -> Result { + self.sparql_store + .query_with_graphs(&query, graphs) + .map_err(|e| e.into()) + } + + /// List all named graph IRIs in this perspective + pub fn named_graphs(&self) -> Result, deno_core::anyhow::Error> { + self.sparql_store.named_graphs().map_err(|e| e.into()) + } + + /// Remove a named graph and all its quads + pub fn remove_graph(&self, graph_iri: &str) -> Result<(), deno_core::anyhow::Error> { + // 1. Find all subject IRIs within this graph (batch — single query) + let subjects_in_graph = self + .sparql_store + .query_with_graphs( + "SELECT DISTINCT ?s WHERE { ?s ?p ?o . FILTER(isIRI(?s)) }", + Some(&[graph_iri.to_string()]), + ) + .unwrap_or_else(|_| "[]".to_string()); + + // Collect subject IRIs + let subject_iris: Vec = + serde_json::from_str::>(&subjects_in_graph) + .unwrap_or_default() + .iter() + .filter_map(|v| v["s"].as_str().map(|s| s.to_string())) + .collect(); + + // 2. Remove the named graph and all its quads (fast — single oxigraph op) + self.sparql_store.remove_named_graph_and_quads(graph_iri)?; + + // 3. Batch-remove incoming links from other graphs targeting deleted subjects + // (single SPARQL query with VALUES clause, not N individual queries) + self.sparql_store + .remove_links_targeting_subjects(&subject_iris)?; + + Ok(()) + } + /// Execute a model query — the executor-side replacement for /// SPARQL-build → hydrate → JS-filter → JS-sort → JS-paginate. pub fn model_query( @@ -2583,6 +2685,7 @@ impl PerspectiveInstance { class_name: &str, query_json: &str, shape_json: Option<&str>, + graph_iris: Option<&[String]>, ) -> Result { let query_input: super::model_query::ModelQueryInput = serde_json::from_str(query_json) .map_err(|e| deno_core::anyhow::anyhow!("Failed to parse model query: {}", e))?; @@ -2592,6 +2695,7 @@ impl PerspectiveInstance { class_name, &query_input, shape_json, + graph_iris, )?; serde_json::to_string(&result).map_err(|e| { @@ -2646,39 +2750,64 @@ impl PerspectiveInstance { Ok(()) } - /// Record the predicates from a diff into `changed_predicates`. - /// `None` means "all predicates changed" (check everything). + /// Record the predicates and graphs from a diff into tracking state. async fn record_changed_predicates(&self, diff: &DecoratedPerspectiveDiff) { let mut changed = self.changed_predicates.lock().await; // If already CheckAll, it's sticky — nothing to do if matches!(*changed, ChangedPredicates::CheckAll) { - return; - } + // Still need to record graphs + } else { + // Collect predicates from the diff + let mut new_preds = HashSet::new(); + let mut has_predicate_less = false; + for link in diff.additions.iter().chain(diff.removals.iter()) { + if let Some(ref pred) = link.data.predicate { + new_preds.insert(pred.clone()); + } else { + has_predicate_less = true; + break; + } + } - // Collect predicates from the diff - let mut new_preds = HashSet::new(); - let mut has_predicate_less = false; - for link in diff.additions.iter().chain(diff.removals.iter()) { - if let Some(ref pred) = link.data.predicate { - new_preds.insert(pred.clone()); + if has_predicate_less { + *changed = ChangedPredicates::CheckAll; } else { - has_predicate_less = true; - break; + match &mut *changed { + ChangedPredicates::NoneRecorded => { + *changed = ChangedPredicates::Specific(new_preds); + } + ChangedPredicates::Specific(existing) => { + existing.extend(new_preds); + } + ChangedPredicates::CheckAll => unreachable!(), + } } } + drop(changed); - if has_predicate_less { - *changed = ChangedPredicates::CheckAll; - } else { - match &mut *changed { - ChangedPredicates::NoneRecorded => { - *changed = ChangedPredicates::Specific(new_preds); + // Record changed graphs + let mut changed_graphs = self.changed_graphs.lock().await; + for link in diff.additions.iter().chain(diff.removals.iter()) { + match &link.graph { + Some(g) => { + match &mut *changed_graphs { + ChangedGraphs::NoneRecorded => { + let mut set = HashSet::new(); + set.insert(g.clone()); + *changed_graphs = ChangedGraphs::Specific(set); + } + ChangedGraphs::Specific(existing) => { + existing.insert(g.clone()); + } + ChangedGraphs::DefaultGraphChanged => { + // Already watching everything + } + } } - ChangedPredicates::Specific(existing) => { - existing.extend(new_preds); + None => { + *changed_graphs = ChangedGraphs::DefaultGraphChanged; } - ChangedPredicates::CheckAll => unreachable!(), } } } @@ -3325,6 +3454,7 @@ impl PerspectiveInstance { parameters: Vec, batch_id: Option, context: &AgentContext, + graph: Option, ) -> Result<(), AnyError> { //let execute_start = std::time::Instant::now(); //log::info!("⚙️ EXECUTE COMMANDS: Starting execution of {} commands for expression '{}', batch_id: {:?}", @@ -3420,6 +3550,7 @@ impl PerspectiveInstance { status, batch_id.clone(), context, + graph.clone(), ) .await?; } @@ -3500,6 +3631,7 @@ impl PerspectiveInstance { status, batch_id.clone(), context, + graph.clone(), ) .await?; } @@ -3541,6 +3673,7 @@ impl PerspectiveInstance { status, batch_id.clone(), context, + graph.clone(), ) .await?; } @@ -3748,6 +3881,7 @@ impl PerspectiveInstance { initial_values: Option, batch_id: Option, context: &AgentContext, + graph: Option, ) -> Result<(), AnyError> { //let create_start = std::time::Instant::now(); //log::info!("🎯 CREATE SUBJECT: Starting create_subject for expression '{}' - batch_id: {:?}", @@ -3816,6 +3950,7 @@ impl PerspectiveInstance { vec![], batch_id.clone(), context, + graph, ) .await?; @@ -4066,6 +4201,13 @@ impl PerspectiveInstance { HashSet::new() // Prolog queries: always re-check }; + // Extract FROM graph IRIs from raw SPARQL subscriptions for automatic graph scoping + let graph_scope = if is_sparql_query(&query) { + extract_from_graph_iris(&query) + } else { + None + }; + let subscribed_query = SubscribedQuery { query, last_result: result_string.clone(), @@ -4073,6 +4215,7 @@ impl PerspectiveInstance { user_email, predicates, model_query_params: None, + graph_scope, }; // Now insert the subscription @@ -4094,9 +4237,15 @@ impl PerspectiveInstance { query_json: String, shape_json: Option, user_email: Option, + graph_iris: Option>, ) -> Result<(String, String), AnyError> { // 1. Run the initial model query - let initial_result = self.model_query(&class_name, &query_json, shape_json.as_deref())?; + let initial_result = self.model_query( + &class_name, + &query_json, + shape_json.as_deref(), + graph_iris.as_deref(), + )?; // 2. Build trigger SPARQL from shape predicates. // Parse the shape to extract required predicates for change detection. @@ -4123,7 +4272,7 @@ impl PerspectiveInstance { let predicate_set: HashSet = trigger_predicates.into_iter().collect(); - // 3. Check for existing subscription with same params + // 3. Check for existing subscription with same params (including graph_iris) let existing_subscription = { let queries = self.subscribed_queries.lock().await; queries @@ -4133,6 +4282,7 @@ impl PerspectiveInstance { params.class_name == class_name && params.query_json == query_json && params.shape_json == shape_json + && params.graph_iris == graph_iris && q.user_email == user_email } else { false @@ -4167,7 +4317,9 @@ impl PerspectiveInstance { class_name, query_json, shape_json, + graph_iris: graph_iris.clone(), }), + graph_scope: graph_iris, }; self.subscribed_queries @@ -4277,7 +4429,11 @@ impl PerspectiveInstance { } } - async fn check_subscribed_queries(&self, changed_predicates: ChangedPredicates) { + async fn check_subscribed_queries( + &self, + changed_predicates: ChangedPredicates, + changed_graphs: ChangedGraphs, + ) { let mut queries_to_remove = Vec::new(); let mut query_futures: Vec< std::pin::Pin> + Send>>, @@ -4285,7 +4441,7 @@ impl PerspectiveInstance { let now = Instant::now(); // Collect only the minimal data needed: ID, query string, user_email, keepalive time, predicates, - // and model_query_params (if this is a model subscription). + // model_query_params, and graph_scope. // DON'T clone the potentially huge last_result string let queries = { let queries = self.subscribed_queries.lock().await; @@ -4299,13 +4455,22 @@ impl PerspectiveInstance { query.last_keepalive, query.predicates.clone(), query.model_query_params.clone(), + query.graph_scope.clone(), ) }) .collect::>() }; // Create futures for each query check - for (id, query_string, user_email, last_keepalive, sub_predicates, model_params) in queries + for ( + id, + query_string, + user_email, + last_keepalive, + sub_predicates, + model_params, + graph_scope, + ) in queries { // Check for timeout if now.duration_since(last_keepalive).as_secs() > QUERY_SUBSCRIPTION_TIMEOUT { @@ -4333,6 +4498,27 @@ impl PerspectiveInstance { continue; } + // Graph scope filtering: if this subscription watches specific graphs, + // skip if none of the changed graphs overlap. + if let Some(ref scope) = graph_scope { + if !scope.is_empty() { + match &changed_graphs { + ChangedGraphs::NoneRecorded => continue, // No graph changes at all + ChangedGraphs::DefaultGraphChanged => { + // Default graph changed — graph-scoped subs might still be + // relevant if predicates overlap. Let query re-evaluate. + } + ChangedGraphs::Specific(changed) => { + let scope_set: HashSet<&str> = + scope.iter().map(|s| s.as_str()).collect(); + if changed.iter().all(|g| !scope_set.contains(g.as_str())) { + continue; // No overlap with watched graphs + } + } + } + } + } + // Each future returns Option<(id, new_result)> instead of locking individually. // This avoids a lock convoy where N futures all contend on subscribed_queries. let self_clone = self.clone(); @@ -4349,6 +4535,7 @@ impl PerspectiveInstance { ¶ms.class_name, ¶ms.query_json, params.shape_json.as_deref(), + params.graph_iris.as_deref(), ) { Ok(r) => r, Err(e) => { @@ -4472,13 +4659,17 @@ impl PerspectiveInstance { &mut *self.changed_predicates.lock().await, ChangedPredicates::NoneRecorded, ); + let changed_graphs = std::mem::replace( + &mut *self.changed_graphs.lock().await, + ChangedGraphs::NoneRecorded, + ); log::debug!( "🔔 Subscription check triggered for perspective {} with changed_preds: {:?}", self.uuid, changed_preds ); - self.check_subscribed_queries(changed_preds).await; + self.check_subscribed_queries(changed_preds, changed_graphs).await; } // Periodic subscription logging @@ -4846,6 +5037,7 @@ mod tests { LinkStatus::Local, None, &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -4888,6 +5080,7 @@ mod tests { LinkStatus::Shared, None, &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -4925,7 +5118,13 @@ mod tests { // Add a link to the perspective let expression = perspective - .add_link(link.clone(), status, None, &AgentContext::main_agent()) + .add_link( + link.clone(), + status, + None, + &AgentContext::main_agent(), + None, + ) .await .unwrap(); @@ -5074,6 +5273,7 @@ mod tests { LinkStatus::Shared, Some(batch_id.clone()), &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -5107,6 +5307,7 @@ mod tests { LinkStatus::Shared, None, &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -5153,6 +5354,7 @@ mod tests { LinkStatus::Shared, None, &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -5171,6 +5373,7 @@ mod tests { LinkStatus::Shared, Some(batch_id.clone()), &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -5180,6 +5383,7 @@ mod tests { LinkStatus::Shared, Some(batch_id.clone()), &AgentContext::main_agent(), + None, ) .await .unwrap(); @@ -5229,6 +5433,7 @@ mod tests { }, proof: Default::default(), status: None, + graph: None, }; let result = perspective .remove_link(non_existent_link.clone(), Some(batch_id.clone())) @@ -5242,6 +5447,7 @@ mod tests { LinkStatus::Shared, Some("invalid".to_string()), &AgentContext::main_agent(), + None, ) .await; assert!(result.is_err()); @@ -5278,6 +5484,7 @@ mod tests { vec![], Some(batch_id.clone()), &AgentContext::main_agent(), + None, ) .await .unwrap(); diff --git a/rust-executor/src/perspectives/shacl_to_prolog.rs b/rust-executor/src/perspectives/shacl_to_prolog.rs index aee1d965a..644cf2645 100644 --- a/rust-executor/src/perspectives/shacl_to_prolog.rs +++ b/rust-executor/src/perspectives/shacl_to_prolog.rs @@ -278,6 +278,7 @@ mod tests { invalid: None, }, status: None, + graph: None, } } diff --git a/rust-executor/src/perspectives/sparql_store.rs b/rust-executor/src/perspectives/sparql_store.rs index 7456b5a28..0d6eb78c4 100644 --- a/rust-executor/src/perspectives/sparql_store.rs +++ b/rust-executor/src/perspectives/sparql_store.rs @@ -7,7 +7,8 @@ use oxigraph::sparql::{QueryResults, SparqlEvaluator}; use oxigraph::store::Store; use serde_json::Value; use sha2::{Digest, Sha256}; -use std::sync::Arc; +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; const ONT_AUTHOR: &str = "ad4m://ontology/author"; const ONT_TIMESTAMP: &str = "ad4m://ontology/timestamp"; @@ -94,6 +95,11 @@ fn strip_html_fn(args: &[Term]) -> Option { Some(Literal::new_simple_literal(&result).into()) } +/// Construct the canonical named-graph IRI for a given base expression. +pub fn make_graph_iri(base_expression: &str) -> String { + format!("ad4m://graph/{}", base_expression) +} + /// Validates that a SPARQL query is read-only by parsing it with the SPARQL parser. /// Only SELECT, ASK, CONSTRUCT, and DESCRIBE queries are accepted. /// UPDATE operations (INSERT, DELETE, DROP, etc.) will fail to parse as a Query. @@ -143,6 +149,8 @@ fn make_direct_triple(link: &DecoratedLinkExpression) -> (NamedNode, NamedNode, #[derive(Clone)] pub struct SparqlStore { store: Arc, + /// Cache of registered named graphs to avoid redundant insert_named_graph calls. + registered_graphs: Arc>>, } impl SparqlStore { @@ -174,6 +182,7 @@ impl SparqlStore { }; Ok(SparqlStore { store: Arc::new(store), + registered_graphs: Arc::new(Mutex::new(HashSet::new())), }) } @@ -193,12 +202,31 @@ impl SparqlStore { let (source_iri, predicate_iri, target_iri) = make_direct_triple(link); let reifier_iri = make_reifier_iri(link); - // 1. Direct triple in default graph + // Resolve graph from link's graph field + let graph_node = link.graph.as_ref().map(|iri| NamedNode::new_unchecked(iri)); + let graph_ref = match &graph_node { + Some(node) => GraphNameRef::NamedNode(node.as_ref()), + None => GraphNameRef::DefaultGraph, + }; + + // If targeting a named graph, ensure it is registered (cached) + if let Some(ref node) = graph_node { + if self + .registered_graphs + .lock() + .unwrap() + .insert(node.as_str().to_string()) + { + let _ = self.store.insert_named_graph(node.as_ref()); + } + } + + // 1. Direct triple in resolved graph self.store.insert(QuadRef::new( source_iri.as_ref(), predicate_iri.as_ref(), TermRef::NamedNode(target_iri.as_ref()), - GraphNameRef::DefaultGraph, + graph_ref, ))?; // 2. Reifier: rdf:reifies <<( source predicate target )>> @@ -212,10 +240,10 @@ impl SparqlStore { reifier_iri.as_ref(), rdf_reifies, TermRef::Triple(&triple_term), - GraphNameRef::DefaultGraph, + graph_ref, ))?; - // 3. Metadata on the reifier node (all default graph) + // 3. Metadata on the reifier node (all in resolved graph) let proof = &link.proof; let valid_str = proof.valid.unwrap_or(false).to_string(); @@ -235,7 +263,7 @@ impl SparqlStore { reifier_iri.as_ref(), pred, TermRef::Literal(lit.as_ref()), - GraphNameRef::DefaultGraph, + graph_ref, ))?; } @@ -251,21 +279,28 @@ impl SparqlStore { pub fn remove_link(&self, link: &DecoratedLinkExpression) -> Result<(), Error> { let reifier_iri = make_reifier_iri(link); - // 1. Remove all quads where reifier is subject (metadata + rdf:reifies) + // Resolve graph scope from link + let graph_node = link.graph.as_ref().map(|iri| NamedNode::new_unchecked(iri)); + let graph_ref = match &graph_node { + Some(node) => GraphNameRef::NamedNode(node.as_ref()), + None => GraphNameRef::DefaultGraph, + }; + + // 1. Remove all quads where reifier is subject IN THIS GRAPH let quads: Vec<_> = self .store .quads_for_pattern( Some(reifier_iri.as_ref().into()), None, None, - Some(GraphNameRef::DefaultGraph), + Some(graph_ref), ) .collect::, _>>()?; for quad in &quads { self.store.remove(quad)?; } - // 2. Remove the direct triple IF no other reifier references it + // 2. Remove the direct triple IF no other reifier references it IN THIS GRAPH let (source, predicate, target) = make_direct_triple(link); let triple_term = Triple::new(source.clone(), predicate.clone(), target.clone()); let rdf_reifies = NamedNodeRef::new_unchecked(RDF_REIFIES); @@ -276,7 +311,7 @@ impl SparqlStore { None, Some(rdf_reifies), Some(TermRef::Triple(&triple_term)), - None, + Some(graph_ref), ) .next() .is_some(); @@ -286,7 +321,7 @@ impl SparqlStore { source.as_ref(), predicate.as_ref(), TermRef::NamedNode(target.as_ref()), - GraphNameRef::DefaultGraph, + graph_ref, ))?; } @@ -295,6 +330,77 @@ impl SparqlStore { /// Return all links in the store using a SPARQL 1.2 reifier query. pub fn get_all_links(&self) -> Result, Error> { + // Include all named graphs in the default graph so all links are visible + if self.has_named_graphs() { + // Query default graph links (no graph field) + let default_query = r#" + PREFIX rdf: + SELECT ?source ?predicate ?target ?author ?timestamp ?proofKey ?proofSig ?proofValid ?status WHERE { + ?source ?predicate ?target . + ?reifier rdf:reifies <<( ?source ?predicate ?target )>> . + FILTER(isIRI(?source) && isIRI(?predicate)) + ?reifier ?author . + ?reifier ?timestamp . + OPTIONAL { ?reifier ?proofKey . } + OPTIONAL { ?reifier ?proofSig . } + OPTIONAL { ?reifier ?proofValid . } + OPTIONAL { ?reifier ?status . } + } + "#; + let default_parsed = oxigraph::sparql::Query::parse(default_query, None) + .map_err(|e| anyhow!("Failed to parse get_all_links query: {}", e))?; + #[allow(deprecated)] + let default_results = self + .store + .query_opt(default_parsed, self.sparql_evaluator())?; + + let mut links = Vec::new(); + if let QueryResults::Solutions(solutions) = default_results { + for solution in solutions { + let solution = solution?; + if let Some(link) = self.link_from_solution(&solution) { + links.push(link); + } + } + } + + // Query named graph links (with graph field preserved via GRAPH ?g) + let named_query = r#" + PREFIX rdf: + SELECT ?source ?predicate ?target ?author ?timestamp ?proofKey ?proofSig ?proofValid ?status ?graph WHERE { + GRAPH ?g { + ?source ?predicate ?target . + ?reifier rdf:reifies <<( ?source ?predicate ?target )>> . + FILTER(isIRI(?source) && isIRI(?predicate)) + ?reifier ?author . + ?reifier ?timestamp . + OPTIONAL { ?reifier ?proofKey . } + OPTIONAL { ?reifier ?proofSig . } + OPTIONAL { ?reifier ?proofValid . } + OPTIONAL { ?reifier ?status . } + } + BIND(STR(?g) AS ?graph) + } + "#; + let named_parsed = oxigraph::sparql::Query::parse(named_query, None) + .map_err(|e| anyhow!("Failed to parse get_all_links named query: {}", e))?; + #[allow(deprecated)] + let named_results = self + .store + .query_opt(named_parsed, self.sparql_evaluator())?; + + if let QueryResults::Solutions(solutions) = named_results { + for solution in solutions { + let solution = solution?; + if let Some(link) = self.link_from_solution(&solution) { + links.push(link); + } + } + } + + return Ok(links); + } + let query = r#" PREFIX rdf: SELECT ?source ?predicate ?target ?author ?timestamp ?proofKey ?proofSig ?proofValid ?status WHERE { @@ -310,10 +416,12 @@ impl SparqlStore { } "#; - let results = self + let mut prepared = self .sparql_evaluator() .parse_query(query) - .map_err(|e| anyhow!("Failed to parse get_all_links query: {}", e))? + .map_err(|e| anyhow!("Failed to parse get_all_links query: {}", e))?; + + let results = prepared .on_store(&self.store) .execute() .map_err(|e| anyhow!("get_all_links query failed: {}", e))?; @@ -334,7 +442,8 @@ impl SparqlStore { } /// Query links matching optional filters using index-based pattern matching. - /// Scans direct triples in the default graph, then looks up reifiers for metadata. + /// Scans direct triples, then looks up reifiers for metadata. + /// When named graphs exist, scans all graphs and populates the graph field on results. pub fn query_links( &self, source: Option<&str>, @@ -361,13 +470,25 @@ impl SparqlStore { let ont_proof_valid = NamedNodeRef::new_unchecked(ONT_PROOF_VALID); let ont_status = NamedNodeRef::new_unchecked(ONT_STATUS); - // Search direct triples in the default graph - for quad_result in - self.store - .quads_for_pattern(s_ref, p_ref, t_ref, Some(GraphNameRef::DefaultGraph)) + // Scan all graphs when named graphs exist, otherwise default graph only + let graph_filter = if self.has_named_graphs() { + None + } else { + Some(GraphNameRef::DefaultGraph) + }; + + for quad_result in self + .store + .quads_for_pattern(s_ref, p_ref, t_ref, graph_filter) { let quad = quad_result?; + // Capture which graph this quad is in + let quad_graph = match &quad.graph_name { + GraphName::NamedNode(n) => Some(n.as_str().to_string()), + _ => None, // DefaultGraph + }; + // Skip reifier and metadata predicates — only process data triples let pred_str = quad.predicate.as_str(); if pred_str == RDF_REIFIES || pred_str.starts_with("ad4m://ontology/") { @@ -391,12 +512,17 @@ impl SparqlStore { quad.object.clone(), ); - // Find all reifiers for this triple + // Find all reifiers for this triple (in the same graph) + let reifier_graph_ref = match &quad.graph_name { + GraphName::NamedNode(n) => GraphNameRef::NamedNode(n.as_ref()), + _ => GraphNameRef::DefaultGraph, + }; + for reifier_quad in self.store.quads_for_pattern( None, Some(rdf_reifies), Some(TermRef::Triple(&triple_term)), - Some(GraphNameRef::DefaultGraph), + Some(reifier_graph_ref), ) { let rq = reifier_quad?; let reifier_node = match &rq.subject { @@ -412,7 +538,7 @@ impl SparqlStore { Some(reifier_subject), Some(pred_node), None, - Some(GraphNameRef::DefaultGraph), + Some(reifier_graph_ref), ) .next() .and_then(|r| r.ok()) @@ -504,6 +630,7 @@ impl SparqlStore { invalid: proof_valid.map(|v| !v), }, status, + graph: quad_graph.clone(), }); if let Some(lim) = limit { @@ -628,6 +755,13 @@ impl SparqlStore { _ => None, }; + let graph_val = get_str("graph"); + let graph = if graph_val.is_empty() { + None + } else { + Some(graph_val) + }; + Some(DecoratedLinkExpression { author, timestamp, @@ -643,24 +777,98 @@ impl SparqlStore { invalid: proof_valid.map(|v| !v), }, status, + graph, }) } /// Execute an arbitrary read-only SPARQL SELECT query, returning a JSON string. - /// All data lives in the default graph — no union graph needed. + /// Delegates to query_with_graphs with no graph scope (backward-compatible). pub fn query(&self, query_string: &str) -> Result { + self.query_with_graphs(query_string, None) + } + + /// Execute a SPARQL query with optional graph scoping. + /// + /// Dataset resolution logic (in priority order): + /// 1. If the query already declares its dataset via `FROM` / `FROM NAMED` / + /// `GRAPH` — respect it as-is (query is self-describing). + /// 2. If `graph_iris: Some(&[...])` is provided externally — set those graphs + /// as the default dataset. + /// 3. If the store has named graphs — union all graphs as default (backward-compat). + /// 4. Otherwise — use the store's default graph directly (fast path). + pub fn query_with_graphs( + &self, + query_string: &str, + graph_iris: Option<&[String]>, + ) -> Result { validate_readonly_query(query_string)?; - let results = self - .sparql_evaluator() - .parse_query(query_string) - .map_err(|e| anyhow!("Failed to parse SPARQL query: {}", e))? - .on_store(&self.store) - .execute() - .map_err(|e| { - let truncated = &query_string[..query_string.len().min(500)]; - anyhow!("SPARQL query failed: {}\nQuery: {}", e, truncated) - })?; + // Parse the query to inspect its dataset declaration + let parsed_query = oxigraph::sparql::Query::parse(query_string, None) + .map_err(|e| anyhow!("Failed to parse SPARQL query: {}", e))?; + + // Check if the query already declares its own dataset (FROM / FROM NAMED) + let query_has_dataset = !parsed_query.dataset().is_default_dataset(); + + // Also detect GRAPH keyword usage in the query text (indicates the user + // is writing cross-graph patterns and knows what they're doing). + // Uses word boundary + following `<` or `?` to avoid false positives from + // string literals, comments, or IRIs containing "GRAPH". + let query_has_graph_keyword = regex::Regex::new(r"(?i)\bGRAPH\s*[<\?]") + .unwrap() + .is_match(query_string); + + let results = if query_has_dataset || query_has_graph_keyword { + // Query is self-describing — respect its dataset declarations. + // This supports FROM clauses (model-generated), GRAPH patterns + // (cross-graph queries), and FROM NAMED (federated patterns). + #[allow(deprecated)] + self.store + .query_opt(parsed_query, self.sparql_evaluator()) + .map_err(|e| { + let truncated = &query_string[..query_string.len().min(500)]; + anyhow!("SPARQL query failed: {}\nQuery: {}", e, truncated) + })? + } else if let Some(iris) = graph_iris.filter(|i| !i.is_empty()) { + // External graph scoping — override the default graph + let mut q = parsed_query; + let graph_names: Vec = iris + .iter() + .map(|iri| GraphName::NamedNode(NamedNode::new_unchecked(iri))) + .collect(); + q.dataset_mut().set_default_graph(graph_names); + + #[allow(deprecated)] + self.store + .query_opt(q, self.sparql_evaluator()) + .map_err(|e| { + let truncated = &query_string[..query_string.len().min(500)]; + anyhow!("SPARQL query failed: {}\nQuery: {}", e, truncated) + })? + } else if self.has_named_graphs() { + // No explicit scoping but store has named graphs — union all + let mut q = parsed_query; + q.dataset_mut().set_default_graph_as_union(); + + #[allow(deprecated)] + self.store + .query_opt(q, self.sparql_evaluator()) + .map_err(|e| { + let truncated = &query_string[..query_string.len().min(500)]; + anyhow!("SPARQL query failed: {}\nQuery: {}", e, truncated) + })? + } else { + // Simple case: no named graphs, no scoping needed + self.sparql_evaluator() + .parse_query(query_string) + .map_err(|e| anyhow!("Failed to parse SPARQL query: {}", e))? + .on_store(&self.store) + .execute() + .map_err(|e| { + let truncated = &query_string[..query_string.len().min(500)]; + anyhow!("SPARQL query failed: {}\nQuery: {}", e, truncated) + })? + }; match results { QueryResults::Solutions(solutions) => { @@ -713,6 +921,59 @@ impl SparqlStore { } } + /// Check whether the store contains any named graphs. + pub fn has_named_graphs(&self) -> bool { + self.store + .named_graphs() + .next() + .transpose() + .ok() + .flatten() + .is_some() + } + + /// List all named graph IRIs in the store. + pub fn named_graphs(&self) -> Result, Error> { + let mut graphs = Vec::new(); + for result in self.store.named_graphs() { + let term = result?; + if let NamedOrBlankNode::NamedNode(n) = term { + graphs.push(n.into_string()); + } + } + Ok(graphs) + } + + /// Check whether a named graph exists. + pub fn contains_named_graph(&self, iri: &str) -> bool { + let node = NamedNode::new_unchecked(iri); + self.store + .contains_named_graph(node.as_ref()) + .unwrap_or(false) + } + + /// Register an (empty) named graph. Idempotent (cached). + pub fn create_named_graph(&self, iri: &str) -> Result<(), Error> { + if self + .registered_graphs + .lock() + .unwrap() + .insert(iri.to_string()) + { + let node = NamedNode::new_unchecked(iri); + let _ = self.store.insert_named_graph(node.as_ref()); + } + Ok(()) + } + + /// Remove a named graph and all its quads. No-op if graph doesn't exist. + pub fn remove_named_graph_and_quads(&self, iri: &str) -> Result<(), Error> { + let node = NamedNode::new_unchecked(iri); + let _ = self.store.remove_named_graph(node.as_ref()); + self.registered_graphs.lock().unwrap().remove(iri); + Ok(()) + } + /// Remove all triples from the store. pub fn clear(&self) -> Result<(), Error> { self.store.clear()?; @@ -727,6 +988,103 @@ impl SparqlStore { } Ok(()) } + /// Batch-remove all links (across all graphs) that target any of the given IRIs. + /// Uses a single SPARQL query with VALUES clause instead of N individual queries. + pub fn remove_links_targeting_subjects(&self, subject_iris: &[String]) -> Result<(), Error> { + if subject_iris.is_empty() { + return Ok(()); + } + + // Build VALUES clause for batch lookup + let values = subject_iris + .iter() + .map(|s| format!("<{}>", s)) + .collect::>() + .join(" "); + + // Find all reifiers of triples targeting our subjects (in any graph) + let query = format!( + r#"PREFIX rdf: + SELECT ?reifier ?s ?p ?target ?g WHERE {{ + GRAPH ?g {{ + ?s ?p ?target . + ?reifier rdf:reifies <<( ?s ?p ?target )>> . + }} + VALUES ?target {{ {} }} + FILTER(isIRI(?s) && isIRI(?p)) + }}"#, + values + ); + + // Execute as union across all graphs + let mut q = oxigraph::sparql::Query::parse(&query, None) + .map_err(|e| anyhow!("Failed to parse batch cleanup query: {}", e))?; + q.dataset_mut().set_default_graph_as_union(); + + #[allow(deprecated)] + let results = self + .store + .query_opt(q, self.sparql_evaluator()) + .map_err(|e| anyhow!("Batch cleanup query failed: {}", e))?; + + if let QueryResults::Solutions(solutions) = results { + for solution in solutions { + let solution = solution?; + if let (Some(Term::NamedNode(reifier)), Some(Term::NamedNode(graph))) = + (solution.get("reifier").cloned(), solution.get("g").cloned()) + { + // Remove all quads where reifier is subject in that graph + let graph_ref = GraphNameRef::NamedNode(graph.as_ref()); + let quads: Vec<_> = self + .store + .quads_for_pattern( + Some(reifier.as_ref().into()), + None, + None, + Some(graph_ref), + ) + .collect::, _>>()?; + for quad in &quads { + let _ = self.store.remove(quad); + } + + // Remove the direct triple if no other reifier references it + if let ( + Some(Term::NamedNode(s)), + Some(Term::NamedNode(p)), + Some(Term::NamedNode(t)), + ) = ( + solution.get("s").cloned(), + solution.get("p").cloned(), + solution.get("target").cloned(), + ) { + let triple_term = Triple::new(s.clone(), p.clone(), t.clone()); + let rdf_reifies = NamedNodeRef::new_unchecked(RDF_REIFIES); + let still_referenced = self + .store + .quads_for_pattern( + None, + Some(rdf_reifies), + Some(TermRef::Triple(&triple_term)), + Some(graph_ref), + ) + .next() + .is_some(); + if !still_referenced { + let _ = self.store.remove(QuadRef::new( + s.as_ref(), + p.as_ref(), + TermRef::NamedNode(t.as_ref()), + graph_ref, + )); + } + } + } + } + } + + Ok(()) + } /// Check the migration version stored in the store. /// Returns 0 if no migration version is found. @@ -900,6 +1258,7 @@ impl SparqlStore { invalid: proof_valid.map(|v| !v), }, status, + graph: None, }); } } @@ -982,6 +1341,7 @@ mod tests { invalid: Some(false), }, status: Some(LinkStatus::Shared), + graph: None, } } @@ -2341,592 +2701,311 @@ mod tests { ); } - // ── Summarisation Pipeline SPARQL Tests ── - // - // These tests reproduce the exact SPARQL queries used by the Flux - // summarisation pipeline against a real Oxigraph store with reifier - // storage to catch edge cases that mocked tests miss. - - /// Helper: set up a conversation channel with messages in the store. - /// Returns (channel_id, conversation_id, message_ids). - fn setup_conversation_channel( - svc: &SparqlStore, - channel_id: &str, - is_conversation: bool, - message_count: usize, - ) -> (String, String, Vec) { - // Channel entry_type flag - svc.add_link(&make_link( - channel_id, - "flux://entry_type", - "flux://has_channel", - )) - .unwrap(); - - // isConversation property - if is_conversation { - svc.add_link(&make_link( - channel_id, - "flux://channel_is_conversation", - "true", - )) - .unwrap(); - } - - // Conversation entity as child of channel - let conv_id = format!("{}-conv", channel_id); - svc.add_link(&make_link(channel_id, "ad4m://has_child", &conv_id)) - .unwrap(); - svc.add_link(&make_link( - &conv_id, - "flux://entry_type", - "flux://conversation", - )) - .unwrap(); - - // Channel creation link (parent -> channel) - svc.add_link(&make_link("ad4m://self", "flux://has_channel", channel_id)) - .unwrap(); - - // Messages as children of channel - let mut msg_ids = Vec::new(); - for i in 0..message_count { - let msg_id = format!("{}-msg-{}", channel_id, i); - let link = make_link_with_ts( - channel_id, - "ad4m://has_child", - &msg_id, - &format!("2026-01-15T10:{:02}:00.000Z", i), - "did:key:alice", - ); - svc.add_link(&link).unwrap(); - svc.add_link(&make_link( - &msg_id, - "flux://entry_type", - "flux://has_message", - )) - .unwrap(); - svc.add_link(&make_link( - &msg_id, - "flux://body", - &format!("literal:string:Message%20{}", i), - )) - .unwrap(); - msg_ids.push(msg_id); - } - - (channel_id.to_string(), conv_id, msg_ids) - } + // ── Named Graph Tests ── #[test] - fn test_recent_conversations_sparql_basic() { - // Tests the exact SPARQL query from Channel.recentConversations() - // against the real store with reifier storage model. + fn test_named_graph_insert_and_query() { let svc = new_service(); - setup_conversation_channel(&svc, "flux://ch1", true, 3); - - let query = r#" - PREFIX rdf: - SELECT ?channelId (SAMPLE(?cId) AS ?conversationId) (MAX(?ts) AS ?lastActivity) WHERE { - ?channelId . - ?channelId ?_isConv . - FILTER(STR((?_isConv)) = "true") - OPTIONAL { - ?channelId ?cId . - ?cId . - } - OPTIONAL { - ?channelId ?item . - ?_itemReifier rdf:reifies <<( ?channelId ?item )>> . - ?_itemReifier ?itemTs . - ?item ?itemType . - FILTER(?itemType IN (, )) - } - OPTIONAL { - ?_chanReifier rdf:reifies <<( ?_parent ?channelId )>> . - ?_chanReifier ?chanCreatedTs . - } - BIND(COALESCE(?itemTs, ?chanCreatedTs, "1970-01-01T00:00:00Z"^^) AS ?ts) - } - GROUP BY ?channelId - ORDER BY DESC(?lastActivity) - LIMIT 20 - "#; + let mut link = make_link("ad4m://source1", "ad4m://predicate1", "ad4m://target1"); + link.graph = Some("ad4m://graph/entity1".to_string()); + svc.add_link(&link).unwrap(); - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); + // Should be visible in graph-scoped query + let result = svc + .query_with_graphs( + "SELECT ?s ?p ?o WHERE { ?s ?p ?o }", + Some(&["ad4m://graph/entity1".to_string()]), + ) + .unwrap(); assert!( - !rows.is_empty(), - "recentConversations query returned no results. Raw: {}", - result - ); - assert_eq!( - rows[0]["channelId"].as_str().unwrap(), - "flux://ch1", - "Expected channel ID flux://ch1, got: {:?}", - rows[0] - ); - assert_eq!( - rows[0]["conversationId"].as_str().unwrap(), - "flux://ch1-conv", - "Expected conversation ID, got: {:?}", - rows[0] + result.contains("ad4m://source1"), + "Graph-scoped query should find the link" ); - // lastActivity should be set (from message timestamps) - let last_activity = rows[0]["lastActivity"].as_str().unwrap_or(""); + + // Should be visible in unscoped query (union all graphs) + let result = svc.query("SELECT ?s ?p ?o WHERE { ?s ?p ?o }").unwrap(); assert!( - !last_activity.is_empty() && last_activity != "1970-01-01T00:00:00Z", - "Expected valid lastActivity timestamp, got: {}", - last_activity + result.contains("ad4m://source1"), + "Unscoped query should find the link via union" ); } #[test] - fn test_recent_conversations_non_conversation_channel_excluded() { - // A channel without isConversation=true should NOT appear + fn test_named_graph_scoped_query_excludes_other_graphs() { let svc = new_service(); - setup_conversation_channel(&svc, "flux://ch-space", false, 5); - setup_conversation_channel(&svc, "flux://ch-conv", true, 2); - let query = r#" - PREFIX rdf: - SELECT ?channelId (SAMPLE(?cId) AS ?conversationId) (MAX(?ts) AS ?lastActivity) WHERE { - ?channelId . - ?channelId ?_isConv . - FILTER(STR((?_isConv)) = "true") - OPTIONAL { - ?channelId ?cId . - ?cId . - } - OPTIONAL { - ?channelId ?item . - ?_itemReifier rdf:reifies <<( ?channelId ?item )>> . - ?_itemReifier ?itemTs . - ?item ?itemType . - FILTER(?itemType IN (, )) - } - OPTIONAL { - ?_chanReifier rdf:reifies <<( ?_parent ?channelId )>> . - ?_chanReifier ?chanCreatedTs . - } - BIND(COALESCE(?itemTs, ?chanCreatedTs, "1970-01-01T00:00:00Z"^^) AS ?ts) - } - GROUP BY ?channelId - ORDER BY DESC(?lastActivity) - LIMIT 20 - "#; + let mut link_a = make_link("ad4m://src_a", "ad4m://pred", "ad4m://tgt_a"); + link_a.graph = Some("ad4m://graph/graphA".to_string()); + svc.add_link(&link_a).unwrap(); - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - let channel_ids: Vec<&str> = rows - .iter() - .filter_map(|r| r["channelId"].as_str()) - .collect(); + let mut link_b = make_link("ad4m://src_b", "ad4m://pred", "ad4m://tgt_b"); + link_b.graph = Some("ad4m://graph/graphB".to_string()); + svc.add_link(&link_b).unwrap(); + + // Query scoped to graphA should only see link_a + let result = svc + .query_with_graphs( + "SELECT ?s WHERE { ?s ?o }", + Some(&["ad4m://graph/graphA".to_string()]), + ) + .unwrap(); + assert!(result.contains("ad4m://src_a"), "Should see link in graphA"); assert!( - !channel_ids.contains(&"flux://ch-space"), - "Non-conversation channel should be excluded. Got: {:?}", - channel_ids + !result.contains("ad4m://src_b"), + "Should NOT see link in graphB when scoped to graphA" ); + + // Query scoped to graphB should only see link_b + let result = svc + .query_with_graphs( + "SELECT ?s WHERE { ?s ?o }", + Some(&["ad4m://graph/graphB".to_string()]), + ) + .unwrap(); + assert!(result.contains("ad4m://src_b"), "Should see link in graphB"); assert!( - channel_ids.contains(&"flux://ch-conv"), - "Conversation channel should be included. Got: {:?}", - channel_ids + !result.contains("ad4m://src_a"), + "Should NOT see link in graphA when scoped to graphB" ); } #[test] - fn test_recent_conversations_boolean_literal_prefix() { - // Tests that isConversation stored as "literal:boolean:true" also works + fn test_bulk_delete_via_remove_named_graph() { let svc = new_service(); - svc.add_link(&make_link( - "flux://ch-lit", - "flux://entry_type", - "flux://has_channel", - )) - .unwrap(); - svc.add_link(&make_link( - "flux://ch-lit", - "flux://channel_is_conversation", - "literal:boolean:true", - )) - .unwrap(); - let query = r#" - SELECT ?channelId WHERE { - ?channelId . - ?channelId ?_isConv . - FILTER(STR((?_isConv)) = "true") - } - "#; + // Add 3 links to a named graph + for i in 0..3 { + let mut link = make_link( + &format!("ad4m://src{}", i), + "ad4m://pred", + &format!("ad4m://tgt{}", i), + ); + link.graph = Some("ad4m://graph/bulk_test".to_string()); + svc.add_link(&link).unwrap(); + } - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert!( - !rows.is_empty(), - "literal:boolean:true should pass parse_literal filter. Raw: {}", - result + // Add 1 link to default graph (should survive) + let default_link = make_link("ad4m://default_src", "ad4m://pred", "ad4m://default_tgt"); + svc.add_link(&default_link).unwrap(); + + // Verify all 4 links exist + let all = svc.query_links(None, None, None, None, None, None).unwrap(); + assert_eq!(all.len(), 4, "Should have 4 total links before bulk delete"); + + // Bulk delete the named graph + svc.remove_named_graph_and_quads("ad4m://graph/bulk_test") + .unwrap(); + + // Only default graph link should remain + let all = svc.query_links(None, None, None, None, None, None).unwrap(); + assert_eq!( + all.len(), + 1, + "Only default graph link should survive bulk delete" ); + assert_eq!(all[0].data.source, "ad4m://default_src"); } #[test] - fn test_recent_conversations_empty_channel_no_messages() { - // Channel with isConversation=true but no messages should still appear - // (using channel creation timestamp as fallback) + fn test_cross_graph_duplicate_triple() { let svc = new_service(); - setup_conversation_channel(&svc, "flux://ch-empty", true, 0); - let query = r#" - PREFIX rdf: - SELECT ?channelId (SAMPLE(?cId) AS ?conversationId) (MAX(?ts) AS ?lastActivity) WHERE { - ?channelId . - ?channelId ?_isConv . - FILTER(STR((?_isConv)) = "true") - OPTIONAL { - ?channelId ?cId . - ?cId . - } - OPTIONAL { - ?channelId ?item . - ?_itemReifier rdf:reifies <<( ?channelId ?item )>> . - ?_itemReifier ?itemTs . - ?item ?itemType . - FILTER(?itemType IN (, )) - } - OPTIONAL { - ?_chanReifier rdf:reifies <<( ?_parent ?channelId )>> . - ?_chanReifier ?chanCreatedTs . - } - BIND(COALESCE(?itemTs, ?chanCreatedTs, "1970-01-01T00:00:00Z"^^) AS ?ts) - } - GROUP BY ?channelId - ORDER BY DESC(?lastActivity) - LIMIT 20 - "#; + // Same (s,p,o) in two different graphs + let mut link_default = make_link("ad4m://entity", "ad4m://name", "ad4m://val"); + link_default.timestamp = "2024-01-15T10:00:00.000Z".to_string(); + link_default.proof.signature = "sig_default".to_string(); + svc.add_link(&link_default).unwrap(); - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); + let mut link_named = make_link("ad4m://entity", "ad4m://name", "ad4m://val"); + link_named.timestamp = "2024-01-15T11:00:00.000Z".to_string(); + link_named.proof.signature = "sig_named".to_string(); + link_named.graph = Some("ad4m://graph/g1".to_string()); + svc.add_link(&link_named).unwrap(); + + // Unscoped query should see both (different reifiers) + let all = svc + .query_links(None, Some("ad4m://name"), None, None, None, None) + .unwrap(); + assert_eq!(all.len(), 2, "Should see both links (different reifiers)"); + + // Scoped to g1 should see only the named one + let result = svc + .query_with_graphs( + "SELECT ?s WHERE { ?s }", + Some(&["ad4m://graph/g1".to_string()]), + ) + .unwrap(); + assert!(result.contains("ad4m://entity")); + + // Remove the named graph — only default link should remain + svc.remove_named_graph_and_quads("ad4m://graph/g1").unwrap(); + let all = svc + .query_links(None, Some("ad4m://name"), None, None, None, None) + .unwrap(); + assert_eq!( + all.len(), + 1, + "Only default graph link should remain after named graph removal" + ); assert!( - !rows.is_empty(), - "Empty conversation channel should still appear in results. Raw: {}", - result + all[0].graph.is_none(), + "Remaining link should be in default graph" ); } #[test] - fn test_unprocessed_items_sparql_basic() { - // Tests the exact SPARQL queries from Channel.unprocessedItems() + fn test_graph_field_preserved_through_query_links() { let svc = new_service(); - let (ch_id, _, msg_ids) = setup_conversation_channel(&svc, "flux://ch-unproc", true, 5); - - // Query 1: all items - let all_items_query = format!( - r#"SELECT ?id WHERE {{ - <{}> ?id . - ?id ?type . - FILTER(?type IN (, , )) - }}"#, - ch_id - ); - let result = svc.query(&all_items_query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert_eq!( - rows.len(), - 5, - "Expected 5 items, got {}. Raw: {}", - rows.len(), - result - ); - // Mark some as processed (add to subgroup) - let sg_id = "flux://ch-unproc-sg-1"; - svc.add_link(&make_link( - sg_id, - "flux://entry_type", - "flux://conversation_subgroup", - )) - .unwrap(); - svc.add_link(&make_link(sg_id, "flux://has_item", &msg_ids[0])) - .unwrap(); - svc.add_link(&make_link(sg_id, "flux://has_item", &msg_ids[1])) - .unwrap(); + let mut link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + link.graph = Some("ad4m://graph/test_preserve".to_string()); + svc.add_link(&link).unwrap(); - // Query 2: processed items - let processed_query = r#"SELECT ?id WHERE { - ?sg ?id . - ?sg . - }"#; - let result = svc.query(processed_query).unwrap(); - let processed: Vec = serde_json::from_str(&result).unwrap(); + let results = svc.query_links(None, None, None, None, None, None).unwrap(); + assert_eq!(results.len(), 1); assert_eq!( - processed.len(), - 2, - "Expected 2 processed items, got {}", - processed.len() + results[0].graph.as_deref(), + Some("ad4m://graph/test_preserve"), + "Graph field should be preserved in query_links results" ); } #[test] - fn test_unprocessed_items_data_query_with_reifier() { - // Tests Query 3 from unprocessedItems — fetching full item data - // including reifier metadata (author, timestamp) + fn test_graph_field_preserved_through_get_all_links() { let svc = new_service(); - let (ch_id, _, msg_ids) = setup_conversation_channel(&svc, "flux://ch-data", true, 2); - - let values_clause = msg_ids - .iter() - .map(|id| format!("<{}>", id)) - .collect::>() - .join(" "); - let data_query = format!( - r#"PREFIX rdf: - SELECT ?id ?author ?timestamp ?type ?body WHERE {{ - VALUES ?id {{ {} }} - <{}> ?id . - ?_reifier rdf:reifies <<( <{}> ?id )>> . - ?_reifier ?author . - ?_reifier ?timestamp . - ?id ?type . - FILTER(?type IN (, , )) - OPTIONAL {{ ?id ?body . }} - }} - ORDER BY ?timestamp"#, - values_clause, ch_id, ch_id - ); + let mut link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + link.graph = Some("ad4m://graph/test_all".to_string()); + svc.add_link(&link).unwrap(); - let result = svc.query(&data_query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); + let results = svc.get_all_links().unwrap(); + assert_eq!(results.len(), 1); assert_eq!( - rows.len(), - 2, - "Expected 2 data rows, got {}. Raw: {}", - rows.len(), - result + results[0].graph.as_deref(), + Some("ad4m://graph/test_all"), + "Graph field should be preserved in get_all_links results" ); - // Verify author and timestamp are populated from reifier - for row in &rows { - assert!( - row["author"].as_str().is_some() && !row["author"].as_str().unwrap().is_empty(), - "Author should be populated from reifier. Row: {:?}", - row - ); - assert!( - row["timestamp"].as_str().is_some() - && !row["timestamp"].as_str().unwrap().is_empty(), - "Timestamp should be populated from reifier. Row: {:?}", - row - ); - } } #[test] - fn test_recent_conversations_multiple_channels_ordering() { - // Two conversation channels with different last-activity times. - // The one with more recent messages should come first. + fn test_named_graph_lifecycle() { let svc = new_service(); - // Channel 1: older messages - let ch1 = "flux://ch-old"; - svc.add_link(&make_link(ch1, "flux://entry_type", "flux://has_channel")) - .unwrap(); - svc.add_link(&make_link(ch1, "flux://channel_is_conversation", "true")) - .unwrap(); - let msg1 = make_link_with_ts( - ch1, - "ad4m://has_child", - "flux://ch-old-msg", - "2026-01-01T01:00:00.000Z", - "did:key:alice", + // Initially no named graphs + assert!( + !svc.has_named_graphs(), + "Should have no named graphs initially" ); - svc.add_link(&msg1).unwrap(); - svc.add_link(&make_link( - "flux://ch-old-msg", - "flux://entry_type", - "flux://has_message", - )) - .unwrap(); + assert!(svc.named_graphs().unwrap().is_empty()); - // Channel 2: newer messages - let ch2 = "flux://ch-new"; - svc.add_link(&make_link(ch2, "flux://entry_type", "flux://has_channel")) - .unwrap(); - svc.add_link(&make_link(ch2, "flux://channel_is_conversation", "true")) + // Create a named graph + svc.create_named_graph("ad4m://graph/lifecycle_test") .unwrap(); - let msg2 = make_link_with_ts( - ch2, - "ad4m://has_child", - "flux://ch-new-msg", - "2026-01-15T10:00:00.000Z", - "did:key:bob", + assert!( + svc.has_named_graphs(), + "Should have named graphs after creation" ); - svc.add_link(&msg2).unwrap(); - svc.add_link(&make_link( - "flux://ch-new-msg", - "flux://entry_type", - "flux://has_message", - )) - .unwrap(); + assert!(svc.contains_named_graph("ad4m://graph/lifecycle_test")); + assert_eq!(svc.named_graphs().unwrap().len(), 1); - let query = r#" - PREFIX rdf: - SELECT ?channelId (SAMPLE(?cId) AS ?conversationId) (MAX(?ts) AS ?lastActivity) WHERE { - ?channelId . - ?channelId ?_isConv . - FILTER(STR((?_isConv)) = "true") - OPTIONAL { - ?channelId ?cId . - ?cId . - } - OPTIONAL { - ?channelId ?item . - ?_itemReifier rdf:reifies <<( ?channelId ?item )>> . - ?_itemReifier ?itemTs . - ?item ?itemType . - FILTER(?itemType IN (, )) - } - OPTIONAL { - ?_chanReifier rdf:reifies <<( ?_parent ?channelId )>> . - ?_chanReifier ?chanCreatedTs . - } - BIND(COALESCE(?itemTs, ?chanCreatedTs, "1970-01-01T00:00:00Z"^^) AS ?ts) - } - GROUP BY ?channelId - ORDER BY DESC(?lastActivity) - LIMIT 20 - "#; + // Create is idempotent + svc.create_named_graph("ad4m://graph/lifecycle_test") + .unwrap(); + assert_eq!(svc.named_graphs().unwrap().len(), 1); - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert_eq!(rows.len(), 2, "Expected 2 channels. Raw: {}", result); - // Newer channel should come first - assert_eq!( - rows[0]["channelId"].as_str().unwrap(), - "flux://ch-new", - "Newer channel should be first. Got: {:?}", - rows - ); - assert_eq!( - rows[1]["channelId"].as_str().unwrap(), - "flux://ch-old", - "Older channel should be second. Got: {:?}", - rows - ); + // Remove + svc.remove_named_graph_and_quads("ad4m://graph/lifecycle_test") + .unwrap(); + assert!(!svc.contains_named_graph("ad4m://graph/lifecycle_test")); + + // Remove non-existent is no-op + svc.remove_named_graph_and_quads("ad4m://graph/nonexistent") + .unwrap(); } #[test] - fn test_conversation_has_child_subscription_direct_triple() { - // Verify that ` ` exists as a direct - // triple (required for SPARQL subscription matching) + fn test_graph_aware_remove_link() { let svc = new_service(); - let link = make_link("flux://ch-sub", "ad4m://has_child", "flux://msg-1"); - svc.add_link(&link).unwrap(); - let query = r#"SELECT ?id WHERE { - ?id . - }"#; - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert_eq!(rows.len(), 1); - assert_eq!(rows[0]["id"].as_str().unwrap(), "flux://msg-1"); - } + // Add link to named graph + let mut link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + link.graph = Some("ad4m://graph/remove_test".to_string()); + svc.add_link(&link).unwrap(); - #[test] - fn test_parse_literal_custom_function_in_sparql() { - // Verify the custom function works when called through SPARQL - let svc = new_service(); - svc.add_link(&make_link( - "flux://test", - "flux://prop", - "literal:string:hello", - )) - .unwrap(); + // Add same (s,p,o) to default graph with different reifier + let mut default_link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + default_link.timestamp = "2024-01-15T11:00:00.000Z".to_string(); + default_link.proof.signature = "sig_other".to_string(); + svc.add_link(&default_link).unwrap(); - let query = r#"SELECT ?val WHERE { - ?raw . - BIND(STR((?raw)) AS ?val) - }"#; - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert_eq!(rows.len(), 1, "Expected 1 result. Raw: {}", result); assert_eq!( - rows[0]["val"].as_str().unwrap(), - "hello", - "parse_literal should strip literal:string: prefix" + svc.query_links(None, None, None, None, None, None) + .unwrap() + .len(), + 2, + "Should have 2 links before removal" ); - } - #[test] - fn test_parse_literal_bare_true_in_filter() { - // Verify bare "true" (NamedNode) passes FILTER(STR(parse_literal(...)) = "true") - let svc = new_service(); - svc.add_link(&make_link("flux://item", "flux://flag", "true")) - .unwrap(); + // Remove only the named graph link + svc.remove_link(&link).unwrap(); - let query = r#"SELECT ?s WHERE { - ?s ?val . - FILTER(STR((?val)) = "true") - }"#; - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); + let remaining = svc.query_links(None, None, None, None, None, None).unwrap(); assert_eq!( - rows.len(), + remaining.len(), 1, - "Bare 'true' should pass parse_literal filter. Raw: {}", - result + "Should have 1 link after graph-scoped removal" + ); + assert!( + remaining[0].graph.is_none(), + "Remaining link should be in default graph" ); } #[test] - fn test_group_by_with_reifier_and_optional() { - // Isolated test: GROUP BY + OPTIONAL with reifier pattern. - // This is the specific combination used in recentConversations. + fn test_default_graph_link_has_no_graph_field() { let svc = new_service(); - // Add two items to one group - let link1 = make_link_with_ts( - "flux://group1", - "flux://has_item", - "flux://item1", - "2026-01-15T10:00:00.000Z", - "did:key:alice", - ); - svc.add_link(&link1).unwrap(); + let link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + svc.add_link(&link).unwrap(); - let link2 = make_link_with_ts( - "flux://group1", - "flux://has_item", - "flux://item2", - "2026-01-15T11:00:00.000Z", - "did:key:bob", + let results = svc.query_links(None, None, None, None, None, None).unwrap(); + assert_eq!(results.len(), 1); + assert!( + results[0].graph.is_none(), + "Default graph links should have graph = None" ); - svc.add_link(&link2).unwrap(); - - let query = r#" - PREFIX rdf: - SELECT ?groupId (MAX(?ts) AS ?lastTs) WHERE { - ?groupId ?item . - OPTIONAL { - ?_reifier rdf:reifies <<( ?groupId ?item )>> . - ?_reifier ?ts . - } - } - GROUP BY ?groupId - "#; + } - let result = svc.query(query).unwrap(); - let rows: Vec = serde_json::from_str(&result).unwrap(); - assert_eq!(rows.len(), 1, "Expected 1 group. Raw: {}", result); + #[test] + fn test_make_graph_iri() { assert_eq!( - rows[0]["groupId"].as_str().unwrap(), - "flux://group1", - "Group ID mismatch" + make_graph_iri("expr://abc123"), + "ad4m://graph/expr://abc123" ); - // MAX timestamp should be the later one - let last_ts = rows[0]["lastTs"].as_str().unwrap_or(""); + assert_eq!( + make_graph_iri("literal://string:hello"), + "ad4m://graph/literal://string:hello" + ); + } + + #[test] + fn test_query_nonexistent_graph_returns_empty() { + let svc = new_service(); + + // Add a link to default graph + let link = make_link("ad4m://src", "ad4m://pred", "ad4m://tgt"); + svc.add_link(&link).unwrap(); + + // Query a non-existent graph should return empty + let result = svc + .query_with_graphs( + "SELECT ?s ?p ?o WHERE { ?s ?p ?o }", + Some(&["ad4m://graph/nonexistent".to_string()]), + ) + .unwrap(); assert!( - !last_ts.is_empty(), - "MAX timestamp should be present. Raw: {}", - result + !result.contains("ad4m://src"), + "Querying a non-existent named graph should not return default graph data" ); } } diff --git a/rust-executor/src/prolog_service/engine_pool.rs b/rust-executor/src/prolog_service/engine_pool.rs index 4570a029e..de6406391 100644 --- a/rust-executor/src/prolog_service/engine_pool.rs +++ b/rust-executor/src/prolog_service/engine_pool.rs @@ -1047,6 +1047,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }); } @@ -1146,6 +1147,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; let result = pool .update_all_engines_with_links("test".to_string(), test_links, None, None) @@ -1181,6 +1183,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -1197,6 +1200,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -1269,6 +1273,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; pool.update_all_engines_with_links("test".to_string(), test_links, None, None) .await @@ -1335,6 +1340,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -1351,6 +1357,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -1517,6 +1524,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -1533,6 +1541,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user3".to_string(), @@ -1549,6 +1558,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -1722,6 +1732,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "item1".to_string(), @@ -1738,6 +1749,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "other_user".to_string(), @@ -1754,6 +1766,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -1873,6 +1886,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }); // Use the PRODUCTION method that properly sets up the pool state @@ -1998,6 +2012,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -2014,6 +2029,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]); @@ -2238,6 +2254,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; // Use the PRODUCTION method that properly sets up the pool state @@ -2377,6 +2394,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user1".to_string(), @@ -2393,6 +2411,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -2450,6 +2469,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user1".to_string(), @@ -2466,6 +2486,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]); @@ -2487,6 +2508,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, // SDNA Prolog code: subject class name -> ad4m://sdna -> literal with actual subject_class rule DecoratedLinkExpression { @@ -2504,6 +2526,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]); @@ -2651,6 +2674,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; pool.update_all_engines_with_links("facts".to_string(), test_links, None, None) @@ -2806,6 +2830,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user1".to_string(), @@ -2822,6 +2847,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user1".to_string(), @@ -2838,6 +2864,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, // User1 profile DecoratedLinkExpression { @@ -2855,6 +2882,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; @@ -3137,6 +3165,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; pool.update_all_engines_with_links("test_facts".to_string(), test_links, None, None) @@ -3308,6 +3337,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; pool.update_all_engines_with_links("facts".to_string(), test_links, None, None) @@ -3642,6 +3672,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; pool.update_all_engines_with_links("facts".to_string(), test_links, None, None) @@ -3698,6 +3729,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -3714,6 +3746,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ]; diff --git a/rust-executor/src/prolog_service/filtered_pool.rs b/rust-executor/src/prolog_service/filtered_pool.rs index e2700ae45..4f49ac0e8 100644 --- a/rust-executor/src/prolog_service/filtered_pool.rs +++ b/rust-executor/src/prolog_service/filtered_pool.rs @@ -799,6 +799,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user2".to_string(), @@ -815,6 +816,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "user1".to_string(), @@ -831,6 +833,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }, ] } @@ -855,6 +858,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }); } links diff --git a/rust-executor/src/prolog_service/mod.rs b/rust-executor/src/prolog_service/mod.rs index d8934895c..575400930 100644 --- a/rust-executor/src/prolog_service/mod.rs +++ b/rust-executor/src/prolog_service/mod.rs @@ -813,6 +813,7 @@ mod prolog_test { invalid: Some(false), }, status: None, + graph: None, }, DecoratedLinkExpression { author: "test_author".to_string(), @@ -829,6 +830,7 @@ mod prolog_test { invalid: Some(false), }, status: None, + graph: None, }, ]; diff --git a/rust-executor/src/prolog_service/sdna_pool.rs b/rust-executor/src/prolog_service/sdna_pool.rs index a7f81b3f8..61930ccfe 100644 --- a/rust-executor/src/prolog_service/sdna_pool.rs +++ b/rust-executor/src/prolog_service/sdna_pool.rs @@ -420,6 +420,7 @@ mod tests { invalid: Some(false), }, status: None, + graph: None, }]; // Set up test data in the complete pool first diff --git a/rust-executor/src/types/core.rs b/rust-executor/src/types/core.rs index db09a1ab2..ddb99b888 100644 --- a/rust-executor/src/types/core.rs +++ b/rust-executor/src/types/core.rs @@ -150,6 +150,8 @@ pub struct LinkExpression { pub data: Link, pub proof: ExpressionProof, pub status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub graph: Option, } impl TryFrom for LinkExpression { @@ -170,6 +172,7 @@ impl TryFrom for LinkExpression { signature: input.proof.signature.ok_or(anyhow!("Key is required"))?, }, status: input.status, + graph: input.graph, }) } } @@ -188,6 +191,7 @@ impl LinkExpression { data, proof: ExpressionProof::default(), status: input.status, + graph: input.graph, } } } @@ -211,6 +215,7 @@ impl From> for LinkExpression { data: expr.data.normalize(), proof: expr.proof, status: None, + graph: None, } } } @@ -225,6 +230,8 @@ pub struct DecoratedLinkExpression { pub proof: DecoratedExpressionProof, #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub graph: Option, } impl DecoratedLinkExpression { @@ -246,6 +253,7 @@ impl DecoratedLinkExpression { impl From<(LinkExpression, LinkStatus)> for DecoratedLinkExpression { fn from((expr, status): (LinkExpression, LinkStatus)) -> Self { + let graph = expr.graph.clone(); let mut expr: Expression = expr.into(); expr.data = expr.data.normalize(); let verified_expr: VerifiedExpression = expr.into(); @@ -255,6 +263,7 @@ impl From<(LinkExpression, LinkStatus)> for DecoratedLinkExpression { data: verified_expr.data, proof: verified_expr.proof, status: Some(status), + graph, } } } @@ -270,6 +279,7 @@ impl From for LinkExpression { signature: decorated.proof.signature, }, status: decorated.status, + graph: decorated.graph, } } } diff --git a/rust-executor/src/types/domain.rs b/rust-executor/src/types/domain.rs index 5affca7a0..75220df28 100644 --- a/rust-executor/src/types/domain.rs +++ b/rust-executor/src/types/domain.rs @@ -296,6 +296,7 @@ pub struct LinkExpressionInput { pub proof: ExpressionProofInput, pub timestamp: String, pub status: Option, + pub graph: Option, } #[derive(Default, Debug, Deserialize, Serialize, Clone)] @@ -434,6 +435,7 @@ impl TryFrom for DecoratedLinkExpression { invalid: input.proof.invalid, }, status: input.status, + graph: input.graph, }) } }