-
Notifications
You must be signed in to change notification settings - Fork 18
Unified CEL context and on_reply become Tasks #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: task-dependencies
Are you sure you want to change the base?
Changes from all commits
fd9455e
68416b1
24960a8
b23c9ee
5efd4ab
49ff5b4
eda890d
2394d94
73ae484
66e05aa
1ff92da
49318ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,13 @@ | ||
| use cel::Value; | ||
| use cel::{Context, Env, Value}; | ||
| use std::cell::OnceCell; | ||
| use std::collections::{BTreeMap, HashMap}; | ||
| use std::collections::{BTreeMap, HashMap, HashSet}; | ||
| use std::sync::Arc; | ||
| use tracing::{debug, warn}; | ||
| use tracing::{debug, error, warn}; | ||
|
|
||
| use crate::data::attribute::{wasm_prop, AttributeError, AttributeState, AttributeValue, Path}; | ||
| use crate::data::{Expression, Headers}; | ||
| use crate::kuadrant::cache::{AttributeCache, CachedValue}; | ||
| use crate::kuadrant::pipeline::tasks::Task; | ||
| use crate::kuadrant::resolver::{AttributeResolver, ProxyWasmHost}; | ||
| use crate::services::ServiceError; | ||
| use tracing_opentelemetry::OpenTelemetrySpanExt; | ||
|
|
@@ -28,6 +29,7 @@ pub struct ReqRespCtx { | |
| tracker: Tracker, | ||
| stored_values: BTreeMap<String, Value>, | ||
| pub barrier: Barrier, | ||
| pub cel: CelScope, | ||
| } | ||
|
|
||
| impl Default for ReqRespCtx { | ||
|
|
@@ -49,6 +51,7 @@ impl ReqRespCtx { | |
| tracker: Tracker::default(), | ||
| stored_values: BTreeMap::new(), | ||
| barrier: Barrier::default(), | ||
| cel: CelScope::default(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -467,7 +470,7 @@ impl Barrier { | |
| match self.count.checked_sub(1) { | ||
| Some(new_value) => self.count = new_value, | ||
| None => { | ||
| tracing::error!( | ||
| error!( | ||
| "Attempted to lower upstream barrier when count is already 0 - mismatched raise/lower pairs" | ||
| ); | ||
| } | ||
|
|
@@ -514,6 +517,64 @@ impl BodyContext { | |
| } | ||
| } | ||
|
|
||
| pub struct CelScope { | ||
| env: Arc<Env>, | ||
| registered: HashSet<String>, | ||
| bindings: BTreeMap<String, Vec<(String, Value)>>, | ||
| } | ||
|
|
||
| impl Default for CelScope { | ||
| fn default() -> Self { | ||
| Self { | ||
| env: Arc::new(Env::stdlib()), | ||
| registered: HashSet::new(), | ||
| bindings: BTreeMap::new(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl CelScope { | ||
| pub fn new_ctx(&mut self, task: &dyn Task) -> Context<'static> { | ||
| let task_id = task.id(); | ||
|
|
||
| if !self.registered.contains(task_id) { | ||
| let types = task.cel_types(); | ||
| if !types.is_empty() { | ||
| match Arc::get_mut(&mut self.env) { | ||
| Some(env) => { | ||
| for type_def in types { | ||
| env.add_struct(type_def); | ||
| } | ||
| } | ||
| None => error!("Failed to add CEL types: Arc refcount > 1"), | ||
| } | ||
| } | ||
| self.registered.insert(task_id.to_string()); | ||
|
Comment on lines
+540
to
+552
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not mark a task as registered when CEL type registration failed. If Suggested fix if !self.registered.contains(task_id) {
let types = task.cel_types();
+ let mut registration_succeeded = true;
if !types.is_empty() {
match Arc::get_mut(&mut self.env) {
Some(env) => {
for type_def in types {
env.add_struct(type_def);
}
}
- None => error!("Failed to add CEL types: Arc refcount > 1"),
+ None => {
+ error!("Failed to add CEL types: Arc refcount > 1");
+ registration_succeeded = false;
+ }
}
}
- self.registered.insert(task_id.to_string());
+ if registration_succeeded {
+ self.registered.insert(task_id.to_string());
+ }
}🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| let mut ctx = Context::with_env(Arc::clone(&self.env)); | ||
| for (scope_id, scope_bindings) in &self.bindings { | ||
| if is_ancestor(scope_id, task_id) { | ||
| for (name, value) in scope_bindings { | ||
| ctx.add_variable_from_value(name, value.clone()); | ||
| } | ||
| } | ||
| } | ||
| ctx | ||
| } | ||
|
|
||
| pub fn add_scoped_binding(&mut self, task_id: &str, name: String, val: Value) { | ||
| self.bindings | ||
| .entry(task_id.to_string()) | ||
| .or_default() | ||
| .push((name, val)); | ||
| } | ||
| } | ||
|
|
||
| fn is_ancestor(scope_id: &str, task_id: &str) -> bool { | ||
| task_id == scope_id || task_id.starts_with(&format!("{}.", scope_id)) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
@@ -754,4 +815,66 @@ mod tests { | |
| Ok(AttributeState::Available(Some(ref s))) if s == "external-user-id" | ||
| )); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_cel_scope_hierarchical_bindings() { | ||
| use crate::kuadrant::pipeline::tasks::{Task, TaskOutcome}; | ||
|
|
||
| struct MockTask { | ||
| id: String, | ||
| } | ||
| impl Task for MockTask { | ||
| fn id(&self) -> &str { | ||
| &self.id | ||
| } | ||
| fn apply(self: Box<Self>, _ctx: &mut ReqRespCtx) -> TaskOutcome { | ||
| TaskOutcome::Done | ||
| } | ||
| } | ||
|
|
||
| let mut scope = CelScope::default(); | ||
|
|
||
| // Task "0" gets a binding | ||
| scope.add_scoped_binding( | ||
| "0", | ||
| "my_response".to_string(), | ||
| Value::String(Arc::new("user123".to_string())), | ||
| ); | ||
|
|
||
| // Task "0.0" should see "my_response" from parent "0" | ||
| let task_0_0 = MockTask { | ||
| id: "0.0".to_string(), | ||
| }; | ||
| let ctx_0_0 = scope.new_ctx(&task_0_0); | ||
| assert!(ctx_0_0.get_variable("my_response").is_some()); | ||
|
|
||
| // Task "1" should NOT see "my_response" (different branch) | ||
| let task_1 = MockTask { | ||
| id: "1".to_string(), | ||
| }; | ||
| let ctx_1 = scope.new_ctx(&task_1); | ||
| assert!(ctx_1.get_variable("my_response").is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_is_ancestor() { | ||
| // Same task | ||
| assert!(is_ancestor("0", "0")); | ||
|
|
||
| // Direct child | ||
| assert!(is_ancestor("0", "0.0")); | ||
|
|
||
| // Nested child | ||
| assert!(is_ancestor("0", "0.0.1")); | ||
|
|
||
| // Different branch | ||
| assert!(!is_ancestor("0", "1")); | ||
| assert!(!is_ancestor("0", "1.0")); | ||
|
|
||
| // Sibling | ||
| assert!(!is_ancestor("0.0", "0.1")); | ||
|
|
||
| // Parent relationship is not symmetric | ||
| assert!(!is_ancestor("0.0", "0")); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not reuse one mutable CEL
Contextacross different predicates.Expression::eval()mutates the passed context by adding functions and variable bindings, and nothing removes them afterwards. Reusing a singleContexthere makes predicate evaluation order-dependent: an earlier predicate can leavequeryMapor other bindings behind for a later predicate that should have been evaluated in a clean context.Suggested fix
📝 Committable suggestion
🤖 Prompt for AI Agents