From fd9455e321f3ff67d8a964dab18bbb0129f81139 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Fri, 29 May 2026 11:04:19 +0100 Subject: [PATCH 01/12] Remove implicit sequential dependency from typed action Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/blueprint.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index 291fe9e3..d4ccc90e 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -185,13 +185,13 @@ impl Blueprint { .enumerate() .map(|(i, action_config)| { let id = i.to_string(); - let dependencies = if i > 0 { - vec![(i - 1).to_string()] - } else { - vec![] - }; match action_config { configuration::ActionConfig::Legacy(action) => { + let dependencies = if i > 0 { + vec![(i - 1).to_string()] + } else { + vec![] + }; let legacy_request_data: Vec<((String, String), String)> = request_data .iter() .map(|(key, expr)| (key.clone(), expr.source().to_string())) @@ -199,7 +199,7 @@ impl Blueprint { Action::compile(action, services, id, dependencies, &legacy_request_data) } configuration::ActionConfig::Typed(typed) => { - Action::compile_typed(typed, services, id, dependencies) + Action::compile_typed(typed, services, id, vec![]) } } }) @@ -435,12 +435,7 @@ impl Action { .enumerate() .map(|(idx, typed_action)| { let reply_id = format!("{}.{}", id, idx); - let reply_deps = if idx > 0 { - vec![format!("{}.{}", id, idx - 1)] - } else { - vec![] - }; - Action::compile_typed(typed_action, services, reply_id, reply_deps) + Action::compile_typed(typed_action, services, reply_id, vec![]) }) .collect::>()?; @@ -1027,6 +1022,7 @@ mod tests { assert!(matches!(service, ServiceInstance::Dynamic(_))); assert_eq!(on_reply.len(), 1); } - assert_eq!(blueprint.actions[1].dependencies, vec!["0"]); + + assert!(blueprint.actions[1].dependencies.is_empty()); } } From 68416b10d6ca94a3329f9ead64e4f09e07a3c588 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 12:02:48 +0100 Subject: [PATCH 02/12] Add task_id to action type tasks Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/blueprint.rs | 4 ++++ src/kuadrant/pipeline/tasks/dynamic.rs | 1 + src/kuadrant/pipeline/tasks/headers.rs | 17 +++++++++++++- src/kuadrant/pipeline/tasks/send_reply.rs | 27 +++++++++++++++++++---- src/kuadrant/pipeline/tasks/store.rs | 9 ++++++++ 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index d4ccc90e..2725bcaf 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -247,6 +247,7 @@ impl Blueprint { ServiceInstance::Tracing(tracing_service) => { ctx.set_public_tracker_id(var.clone()); tasks.push(Box::new(ModifyHeadersTask::new( + action.id.clone(), HeaderOperation::Append( vec![(var.clone(), ctx.request_id().to_string())].into(), ), @@ -303,6 +304,7 @@ impl Blueprint { Operation::Deny { deny_with } => { use crate::kuadrant::pipeline::tasks::SendReplyTask; let task = SendReplyTask::new_deferred( + action.id.clone(), action.predicate.clone(), deny_with.clone(), action.terminal, @@ -314,6 +316,7 @@ impl Blueprint { headers: headers_expr, } => { let task = ModifyHeadersTask::new_deferred( + action.id.clone(), action.predicate.clone(), headers_expr.clone(), target.clone(), @@ -328,6 +331,7 @@ impl Blueprint { } => { use crate::kuadrant::pipeline::tasks::StoreTask; match StoreTask::new( + action.id.clone(), action.predicate.clone(), expression.clone(), path.clone(), diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index e7e6d2f9..e852d886 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -275,6 +275,7 @@ fn process_dynamic_response( let pairs = cel_value_to_header_pairs(val); if !pairs.is_empty() { tasks.push(Box::new(ModifyHeadersTask::new( + "inline_response_header".to_string(), HeaderOperation::Set(pairs.into()), target.clone(), ))); diff --git a/src/kuadrant/pipeline/tasks/headers.rs b/src/kuadrant/pipeline/tasks/headers.rs index d9330dae..39e53d53 100644 --- a/src/kuadrant/pipeline/tasks/headers.rs +++ b/src/kuadrant/pipeline/tasks/headers.rs @@ -36,6 +36,7 @@ enum HeadersMode { #[derive(Clone)] pub struct ModifyHeadersTask { + task_id: String, predicate: Option, mode: HeadersMode, target: HeadersType, @@ -56,8 +57,13 @@ impl Clone for HeadersMode { } impl ModifyHeadersTask { - pub fn new(operation: HeaderOperation, target: HeadersType) -> ModifyHeadersTask { + pub fn new( + task_id: String, + operation: HeaderOperation, + target: HeadersType, + ) -> ModifyHeadersTask { ModifyHeadersTask { + task_id, predicate: None, mode: HeadersMode::Concrete { operation }, target, @@ -66,12 +72,14 @@ impl ModifyHeadersTask { } pub fn new_deferred( + task_id: String, predicate: Predicate, headers_expr: Expression, target: HeadersType, terminal: bool, ) -> Self { Self { + task_id, predicate: Some(predicate), mode: HeadersMode::Deferred { headers_expr }, target, @@ -81,6 +89,10 @@ impl ModifyHeadersTask { } impl Task for ModifyHeadersTask { + fn id(&self) -> Option { + Some(self.task_id.clone()) + } + fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some(ref predicate) = self.predicate { match predicate.test(ctx) { @@ -189,6 +201,7 @@ mod tests { let new_headers: Headers = vec![("New-Key".to_string(), "New-Value".to_string())].into(); let task = Box::new(ModifyHeadersTask::new( + "0".to_string(), HeaderOperation::Append(new_headers), HeadersType::HttpRequestHeaders, )); @@ -222,6 +235,7 @@ mod tests { vec![("Content-Type".to_string(), "application/json".to_string())].into(); let task = Box::new(ModifyHeadersTask::new( + "0".to_string(), HeaderOperation::Set(new_headers), HeadersType::HttpRequestHeaders, )); @@ -254,6 +268,7 @@ mod tests { let keys_to_remove = vec!["API-Key-To-Remove".to_string()]; let task = Box::new(ModifyHeadersTask::new( + "0".to_string(), HeaderOperation::Remove(keys_to_remove), HeadersType::HttpResponseHeaders, )); diff --git a/src/kuadrant/pipeline/tasks/send_reply.rs b/src/kuadrant/pipeline/tasks/send_reply.rs index 547f878f..0a3c2685 100644 --- a/src/kuadrant/pipeline/tasks/send_reply.rs +++ b/src/kuadrant/pipeline/tasks/send_reply.rs @@ -13,13 +13,19 @@ use crate::metrics::METRICS; use crate::services::{cel_value_to_header_pairs, deny_response_struct_def}; pub struct SendReplyTask { + task_id: String, predicate: Option, deny_with: Expression, terminal: bool, } impl SendReplyTask { - pub fn new(status_code: u32, headers: Vec<(String, String)>, body: Option) -> Self { + pub fn new( + task_id: String, + status_code: u32, + headers: Vec<(String, String)>, + body: Option, + ) -> Self { let headers = headers .into_iter() .map(|(h, v)| format!("['''{h}''', '''{v}''']")) @@ -32,14 +38,21 @@ impl SendReplyTask { #[allow(clippy::expect_used)] let deny_with = Expression::new(&expr).expect("Needs to be valid CEL!"); Self { + task_id, predicate: None, deny_with, terminal: false, } } - pub fn new_deferred(predicate: Predicate, deny_with: Expression, terminal: bool) -> Self { + pub fn new_deferred( + task_id: String, + predicate: Predicate, + deny_with: Expression, + terminal: bool, + ) -> Self { Self { + task_id, predicate: Some(predicate), deny_with, terminal, @@ -48,6 +61,7 @@ impl SendReplyTask { pub fn default() -> Self { Self::new( + "default".to_string(), 500, Vec::new(), Some("Internal Server Error.\n".to_string()), @@ -81,11 +95,15 @@ impl TryFrom for SendReplyTask { .map(|v| cel_value_to_header_pairs(&v)) .unwrap_or_default(); - Ok(Self::new(status, headers, body)) + Ok(Self::new("from_value".to_string(), status, headers, body)) } } impl Task for SendReplyTask { + fn id(&self) -> Option { + Some(self.task_id.clone()) + } + #[tracing::instrument(name = "send_reply", skip(self, ctx))] fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some(ref predicate) = self.predicate { @@ -186,6 +204,7 @@ mod tests { let mut ctx = ReqRespCtx::new(Arc::new(mock_host)); let task = Box::new(SendReplyTask::new( + "0".to_string(), 403, vec![ ("content-type".to_string(), "text/plain".to_string()), @@ -206,7 +225,7 @@ mod tests { let mock_host = MockWasmHost::new(); let mut ctx = ReqRespCtx::new(Arc::new(mock_host)); - let task = Box::new(SendReplyTask::new(429, vec![], None)); + let task = Box::new(SendReplyTask::new("0".to_string(), 429, vec![], None)); let outcome = task.apply(&mut ctx); assert!(matches!(outcome, TaskOutcome::Done)); diff --git a/src/kuadrant/pipeline/tasks/store.rs b/src/kuadrant/pipeline/tasks/store.rs index 5a46014c..9cfe5d29 100644 --- a/src/kuadrant/pipeline/tasks/store.rs +++ b/src/kuadrant/pipeline/tasks/store.rs @@ -16,6 +16,7 @@ enum BodySource { } pub struct StoreTask { + task_id: String, predicate: Option, expression: Expression, path: String, @@ -26,6 +27,7 @@ pub struct StoreTask { impl StoreTask { pub fn new( + task_id: String, predicate: Predicate, expression: Expression, path: String, @@ -34,6 +36,7 @@ impl StoreTask { ) -> Result { let body_parser = create_body_parser(&predicate, &expression)?; Ok(Self { + task_id, predicate: Some(predicate), expression, path, @@ -77,6 +80,10 @@ fn create_body_parser( } impl Task for StoreTask { + fn id(&self) -> Option { + Some(self.task_id.clone()) + } + #[tracing::instrument(name = "store", skip(self, ctx), level = tracing::Level::TRACE)] fn apply(mut self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some((ref source, ref mut parser)) = self.body_parser { @@ -202,6 +209,7 @@ mod tests { fn make_store_task(predicate: &str, expression: &str, path: &str) -> Box { Box::new( StoreTask::new( + "0".to_string(), Predicate::new(predicate).unwrap(), Expression::new(expression).unwrap(), path.to_string(), @@ -333,6 +341,7 @@ mod tests { fn invalid_json_pointer_fails_task_creation() { // Invalid JSON pointer format - acutejson expects RFC 6901 format let result = StoreTask::new( + "0".to_string(), Predicate::new("true").unwrap(), Expression::new("requestBodyJSON('not-a-valid-pointer')").unwrap(), "some.path".to_string(), From 24960a8ce61f7f725f7b23860660eaf82096a530 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 12:38:40 +0100 Subject: [PATCH 03/12] Task id is no longer optional Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/executor.rs | 22 +++++++------------ src/kuadrant/pipeline/tasks/dynamic.rs | 6 ++--- src/kuadrant/pipeline/tasks/failure_mode.rs | 2 +- src/kuadrant/pipeline/tasks/headers.rs | 4 ++-- src/kuadrant/pipeline/tasks/mod.rs | 12 +++++----- src/kuadrant/pipeline/tasks/send_reply.rs | 4 ++-- src/kuadrant/pipeline/tasks/store.rs | 4 ++-- src/kuadrant/pipeline/tasks/token_usage.rs | 4 ++++ .../pipeline/tasks/tracing_decorator.rs | 2 +- tests/response_body.rs | 2 +- 10 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/kuadrant/pipeline/executor.rs b/src/kuadrant/pipeline/executor.rs index e6492813..0ceb24ee 100644 --- a/src/kuadrant/pipeline/executor.rs +++ b/src/kuadrant/pipeline/executor.rs @@ -70,7 +70,7 @@ impl Pipeline { let is_guard = task.is_guard(); // Create a new PendingTask with no-op processor let pending = Box::new(PendingTask::new( - task.id().unwrap_or_default(), + task.id().to_string(), Box::new(noop_response_processor(token_id, is_guard)), is_guard, )) as Box; @@ -113,12 +113,10 @@ impl Pipeline { continue; } - let task_id = task.id(); + let task_id = task.id().to_string(); match task.apply(&mut self.ctx) { TaskOutcome::Done => { - if let Some(id) = task_id { - self.completed_tasks.insert(id); - } + self.completed_tasks.insert(task_id); } TaskOutcome::Deferred { token_id, pending } => { if self.deferred_tasks.insert(token_id, pending).is_some() { @@ -162,17 +160,13 @@ impl Pipeline { Ok(_) => {} Err(err) => error!("Failed to set gRPC response data: {}", err), }; - let task_id = pending.id(); + let task_id = pending.id().to_string(); match pending.apply(&mut self.ctx) { TaskOutcome::Done => { - if let Some(id) = task_id { - self.completed_tasks.insert(id); - } + self.completed_tasks.insert(task_id); } TaskOutcome::Requeued(tasks) => { - if let Some(id) = task_id { - self.completed_tasks.insert(id); - } + self.completed_tasks.insert(task_id); for task in tasks.into_iter().rev() { self.task_queue.insert(0, task); } @@ -271,8 +265,8 @@ mod tests { } } - fn id(&self) -> Option { - Some(self.id.clone()) + fn id(&self) -> &str { + &self.id } fn dependencies(&self) -> &[String] { diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index e852d886..7f638b1d 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -85,8 +85,8 @@ impl DynamicTask { } impl Task for DynamicTask { - fn id(&self) -> Option { - Some(self.task_id.clone()) + fn id(&self) -> &str { + &self.task_id } fn dependencies(&self) -> &[String] { @@ -165,7 +165,7 @@ impl Task for DynamicTask { TaskOutcome::Deferred { token_id, pending: Box::new(PendingTask::new( - self.task_id, + task_id.clone(), Box::new(move |ctx| { let outcome = process_dynamic_response( ctx, &service, &task_id, token_id, &name, &on_reply, diff --git a/src/kuadrant/pipeline/tasks/failure_mode.rs b/src/kuadrant/pipeline/tasks/failure_mode.rs index 97760a3a..4ea6472b 100644 --- a/src/kuadrant/pipeline/tasks/failure_mode.rs +++ b/src/kuadrant/pipeline/tasks/failure_mode.rs @@ -39,7 +39,7 @@ impl Task for FailureModeTask { } } - fn id(&self) -> Option { + fn id(&self) -> &str { self.task.id() } diff --git a/src/kuadrant/pipeline/tasks/headers.rs b/src/kuadrant/pipeline/tasks/headers.rs index 39e53d53..e4077d43 100644 --- a/src/kuadrant/pipeline/tasks/headers.rs +++ b/src/kuadrant/pipeline/tasks/headers.rs @@ -89,8 +89,8 @@ impl ModifyHeadersTask { } impl Task for ModifyHeadersTask { - fn id(&self) -> Option { - Some(self.task_id.clone()) + fn id(&self) -> &str { + &self.task_id } fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { diff --git a/src/kuadrant/pipeline/tasks/mod.rs b/src/kuadrant/pipeline/tasks/mod.rs index 74e60f90..f9c91f57 100644 --- a/src/kuadrant/pipeline/tasks/mod.rs +++ b/src/kuadrant/pipeline/tasks/mod.rs @@ -25,9 +25,7 @@ pub type ResponseProcessor = dyn FnOnce(&mut ReqRespCtx) -> TaskOutcome; pub trait Task { fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome; - fn id(&self) -> Option { - None - } + fn id(&self) -> &str; fn dependencies(&self) -> &[String] { &[] @@ -58,8 +56,8 @@ impl Task for PendingTask { fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { (self.process_response)(ctx) } - fn id(&self) -> Option { - Some(self.task_id.clone()) + fn id(&self) -> &str { + &self.task_id } fn is_guard(&self) -> bool { self.is_guard @@ -117,6 +115,10 @@ pub fn noop_response_processor( pub struct NoopTerminalTask; impl Task for NoopTerminalTask { + fn id(&self) -> &str { + "noop" + } + fn apply(self: Box, _ctx: &mut ReqRespCtx) -> TaskOutcome { TaskOutcome::Done } diff --git a/src/kuadrant/pipeline/tasks/send_reply.rs b/src/kuadrant/pipeline/tasks/send_reply.rs index 0a3c2685..b1b23721 100644 --- a/src/kuadrant/pipeline/tasks/send_reply.rs +++ b/src/kuadrant/pipeline/tasks/send_reply.rs @@ -100,8 +100,8 @@ impl TryFrom for SendReplyTask { } impl Task for SendReplyTask { - fn id(&self) -> Option { - Some(self.task_id.clone()) + fn id(&self) -> &str { + &self.task_id } #[tracing::instrument(name = "send_reply", skip(self, ctx))] diff --git a/src/kuadrant/pipeline/tasks/store.rs b/src/kuadrant/pipeline/tasks/store.rs index 9cfe5d29..8386975b 100644 --- a/src/kuadrant/pipeline/tasks/store.rs +++ b/src/kuadrant/pipeline/tasks/store.rs @@ -80,8 +80,8 @@ fn create_body_parser( } impl Task for StoreTask { - fn id(&self) -> Option { - Some(self.task_id.clone()) + fn id(&self) -> &str { + &self.task_id } #[tracing::instrument(name = "store", skip(self, ctx), level = tracing::Level::TRACE)] diff --git a/src/kuadrant/pipeline/tasks/token_usage.rs b/src/kuadrant/pipeline/tasks/token_usage.rs index 15f5d07a..d2b9b501 100644 --- a/src/kuadrant/pipeline/tasks/token_usage.rs +++ b/src/kuadrant/pipeline/tasks/token_usage.rs @@ -37,6 +37,10 @@ impl From> for TokenUsageTask { } impl Task for TokenUsageTask { + fn id(&self) -> &str { + "token_usage" + } + #[tracing::instrument(name = "token_usage", skip(self, ctx))] fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { let mut task: TokenUsageTask = self.into(); diff --git a/src/kuadrant/pipeline/tasks/tracing_decorator.rs b/src/kuadrant/pipeline/tasks/tracing_decorator.rs index a20e7061..3bef7b06 100644 --- a/src/kuadrant/pipeline/tasks/tracing_decorator.rs +++ b/src/kuadrant/pipeline/tasks/tracing_decorator.rs @@ -75,7 +75,7 @@ impl Task for TracingDecoratorTask { } } - fn id(&self) -> Option { + fn id(&self) -> &str { self.task.id() } diff --git a/tests/response_body.rs b/tests/response_body.rs index 61aff6f3..47bb6cf1 100644 --- a/tests/response_body.rs +++ b/tests/response_body.rs @@ -510,7 +510,7 @@ fn it_handles_errors_on_response_body() { Some(LogLevel::Warn), Some("Missing json property: /usage/total_tokens"), ) - .expect_log(Some(LogLevel::Error), Some("Task failed: Some(\"0\")")) + .expect_log(Some(LogLevel::Error), Some("Task failed: \"0\"")) // on response headers/body, expected action is Continue .execute_and_expect(ReturnType::Action(Action::Continue)) .unwrap(); From b23c9eeb5aab3d55f032c14c4531c1b3c966a8c4 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 12:51:35 +0100 Subject: [PATCH 04/12] Add cel_types to task trait Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/tasks/dynamic.rs | 18 +++++++++++++++++- src/kuadrant/pipeline/tasks/mod.rs | 4 ++++ src/services/dynamic.rs | 4 ++-- src/services/dynamic/converters.rs | 19 +++++++++++++++---- src/services/mod.rs | 2 +- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index 7f638b1d..367dcff1 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -12,7 +12,9 @@ use crate::kuadrant::pipeline::tasks::{ }; use crate::kuadrant::ReqRespCtx; use crate::record_error; -use crate::services::{cel_value_to_header_pairs, DynamicService, MessageConverter}; +use crate::services::{ + cel_value_to_header_pairs, DescriptorConverter, DynamicService, MessageConverter, +}; pub struct DynamicTask { task_id: String, @@ -97,6 +99,20 @@ impl Task for DynamicTask { self.is_guard } + fn cel_types(&self) -> Vec { + (|| -> Result, Box> { + let input_desc = self.service.input_descriptor()?; + let output_desc = self.service.output_descriptor()?; + let mut types = DescriptorConverter::collect_struct_defs(&input_desc)?; + types.extend(DescriptorConverter::collect_struct_defs(&output_desc)?); + Ok(types) + })() + .unwrap_or_else(|e| { + error!("Failed to collect CEL types: {}", e); + vec![] + }) + } + fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { match self.predicate.test(ctx) { Ok(AttributeState::Pending) => { diff --git a/src/kuadrant/pipeline/tasks/mod.rs b/src/kuadrant/pipeline/tasks/mod.rs index f9c91f57..488b5562 100644 --- a/src/kuadrant/pipeline/tasks/mod.rs +++ b/src/kuadrant/pipeline/tasks/mod.rs @@ -34,6 +34,10 @@ pub trait Task { fn is_guard(&self) -> bool { false } + + fn cel_types(&self) -> Vec { + vec![] + } } pub struct PendingTask { diff --git a/src/services/dynamic.rs b/src/services/dynamic.rs index 4a9c4779..c661c592 100644 --- a/src/services/dynamic.rs +++ b/src/services/dynamic.rs @@ -128,11 +128,11 @@ impl DynamicService { Ok(method) } - fn input_descriptor(&self) -> Result { + pub fn input_descriptor(&self) -> Result { Ok(self.method_descriptor()?.input()) } - fn output_descriptor(&self) -> Result { + pub fn output_descriptor(&self) -> Result { Ok(self.method_descriptor()?.output()) } diff --git a/src/services/dynamic/converters.rs b/src/services/dynamic/converters.rs index dc63a3e8..398191ec 100644 --- a/src/services/dynamic/converters.rs +++ b/src/services/dynamic/converters.rs @@ -65,10 +65,10 @@ pub struct DescriptorConverter; impl DescriptorConverter { /// Register a message descriptor and all its nested message types with the CEL environment /// This must be called before evaluating CEL expressions that construct these messages - pub fn register_message_types( - env: &mut Env, + pub fn collect_struct_defs( descriptor: &MessageDescriptor, - ) -> Result<(), ConversionError> { + ) -> Result, ConversionError> { + let mut defs = Vec::new(); let mut to_register = vec![descriptor.clone()]; let mut visited = HashSet::new(); @@ -88,9 +88,20 @@ impl DescriptorConverter { } let struct_def = Self::to_struct_def(&desc)?; - env.add_struct(struct_def); + defs.push(struct_def); } + Ok(defs) + } + + pub fn register_message_types( + env: &mut Env, + descriptor: &MessageDescriptor, + ) -> Result<(), ConversionError> { + let defs = Self::collect_struct_defs(descriptor)?; + for def in defs { + env.add_struct(def); + } Ok(()) } diff --git a/src/services/mod.rs b/src/services/mod.rs index 0c398a29..16c25ba7 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -7,7 +7,7 @@ mod dynamic; mod tracing; pub use dynamic::converters::{ - cel_value_to_header_pairs, deny_response_struct_def, MessageConverter, + cel_value_to_header_pairs, deny_response_struct_def, DescriptorConverter, MessageConverter, }; pub use dynamic::DynamicService; pub use tracing::TracingService; From 5efd4ab85e1539e90428911f4a8e47d37e3c4c2e Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 12:52:08 +0100 Subject: [PATCH 05/12] ReqRespCtx provides cel Context Signed-off-by: Adam Cattermole --- src/kuadrant/context.rs | 48 +++++++++++++-- src/kuadrant/pipeline/mod.rs | 2 +- src/kuadrant/pipeline/tasks/dynamic.rs | 82 ++++++++++++-------------- 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/src/kuadrant/context.rs b/src/kuadrant/context.rs index c864ba2c..06a156da 100644 --- a/src/kuadrant/context.rs +++ b/src/kuadrant/context.rs @@ -1,12 +1,13 @@ -use cel::Value; +use cel::{Context, Env, Value}; use std::cell::OnceCell; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; use crate::data::attribute::{wasm_prop, AttributeError, AttributeState, AttributeValue, Path}; use crate::data::{Expression, Headers}; use crate::kuadrant::cache::{AttributeCache, CachedValue}; +use crate::kuadrant::pipeline::tasks::Task; use crate::kuadrant::resolver::{AttributeResolver, ProxyWasmHost}; use crate::services::ServiceError; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -28,6 +29,7 @@ pub struct ReqRespCtx { tracker: Tracker, stored_values: BTreeMap, pub barrier: Barrier, + pub cel: CelScope, } impl Default for ReqRespCtx { @@ -49,6 +51,7 @@ impl ReqRespCtx { tracker: Tracker::default(), stored_values: BTreeMap::new(), barrier: Barrier::default(), + cel: CelScope::default(), } } @@ -467,7 +470,7 @@ impl Barrier { match self.count.checked_sub(1) { Some(new_value) => self.count = new_value, None => { - tracing::error!( + error!( "Attempted to lower upstream barrier when count is already 0 - mismatched raise/lower pairs" ); } @@ -514,6 +517,43 @@ impl BodyContext { } } +pub struct CelScope { + env: Arc, + registered: HashSet, +} + +impl Default for CelScope { + fn default() -> Self { + Self { + env: Arc::new(Env::stdlib()), + registered: HashSet::new(), + } + } +} + +impl CelScope { + pub fn new_ctx(&mut self, task: &dyn Task) -> Context<'static> { + let task_id = task.id(); + + if !self.registered.contains(task.id()) { + let types = task.cel_types(); + if !types.is_empty() { + match Arc::get_mut(&mut self.env) { + Some(env) => { + for type_def in types { + env.add_struct(type_def); + } + } + None => error!("Failed to add CEL types: Arc refcount > 1"), + } + } + self.registered.insert(task_id.to_string()); + } + + Context::with_env(Arc::clone(&self.env)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/kuadrant/pipeline/mod.rs b/src/kuadrant/pipeline/mod.rs index 5fd928bb..e28bd48e 100644 --- a/src/kuadrant/pipeline/mod.rs +++ b/src/kuadrant/pipeline/mod.rs @@ -1,7 +1,7 @@ mod blueprint; mod executor; mod factory; -mod tasks; +pub(crate) mod tasks; pub(crate) use executor::{Pipeline, PipelineState}; pub(crate) use factory::PipelineFactory; diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index 367dcff1..2015cca1 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -30,7 +30,7 @@ pub struct DynamicTask { impl DynamicTask { #[allow(clippy::too_many_arguments)] pub fn new_with_attributes( - ctx: &ReqRespCtx, + ctx: &mut ReqRespCtx, task_id: String, service: Rc, name: String, @@ -40,40 +40,7 @@ impl DynamicTask { dependencies: Vec, is_guard: bool, ) -> Self { - // Warm up the cache - let _ = predicate.test(ctx); - if let Ok(env) = service.cel_env() { - let mut cel_ctx = cel::Context::with_env(env); - let _ = message_builder.eval(ctx, &mut cel_ctx); - - for action in &on_reply { - let _ = action.predicate.test_with_ctx(ctx, &mut cel_ctx); - match &action.operation { - Operation::Grpc { - message_builder, - on_reply: nested_on_reply, - .. - } => { - let _ = message_builder.eval(ctx, &mut cel_ctx); - for nested_action in nested_on_reply { - let _ = nested_action.predicate.test_with_ctx(ctx, &mut cel_ctx); - } - } - Operation::Deny { deny_with } => { - let _ = deny_with.eval(ctx, &mut cel_ctx); - } - Operation::Headers { headers, .. } => { - let _ = headers.eval(ctx, &mut cel_ctx); - } - Operation::Store { expression, .. } => { - let _ = expression.eval(ctx, &mut cel_ctx); - } - Operation::Fail { .. } => {} - } - } - } - - Self { + let task = Self { task_id, service, name, @@ -82,6 +49,41 @@ impl DynamicTask { predicate, dependencies, is_guard, + }; + + task.warm(ctx); + task + } + + fn warm(&self, ctx: &mut ReqRespCtx) { + let _ = self.predicate.test(ctx); + let mut cel_ctx = ctx.cel.new_ctx(self); + let _ = self.message_builder.eval(ctx, &mut cel_ctx); + + for action in &self.on_reply { + let _ = action.predicate.test_with_ctx(ctx, &mut cel_ctx); + match &action.operation { + Operation::Grpc { + message_builder, + on_reply: nested_on_reply, + .. + } => { + let _ = message_builder.eval(ctx, &mut cel_ctx); + for nested_action in nested_on_reply { + let _ = nested_action.predicate.test_with_ctx(ctx, &mut cel_ctx); + } + } + Operation::Deny { deny_with } => { + let _ = deny_with.eval(ctx, &mut cel_ctx); + } + Operation::Headers { headers, .. } => { + let _ = headers.eval(ctx, &mut cel_ctx); + } + Operation::Store { expression, .. } => { + let _ = expression.eval(ctx, &mut cel_ctx); + } + Operation::Fail { .. } => {} + } } } } @@ -135,15 +137,7 @@ impl Task for DynamicTask { tracing::debug_span!("dynamic_request", task_id = self.task_id, name = self.name) .entered(); - let env = match self.service.cel_env() { - Ok(env) => env, - Err(e) => { - error!("Failed to get CEL environment: {e}"); - return TaskOutcome::Failed; - } - }; - - let mut cel_ctx = cel::Context::with_env(env); + let mut cel_ctx = ctx.cel.new_ctx(&*self); let cel_value = match self.message_builder.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { return if ctx.response_body.is_end_of_stream() { From 49ff5b4e0ea55698c3721a85b4e878739d95ac8f Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 14:38:47 +0100 Subject: [PATCH 06/12] All tasks leverage ctx.cel.new_ctx Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/tasks/headers.rs | 2 +- src/kuadrant/pipeline/tasks/send_reply.rs | 12 ++++++------ src/kuadrant/pipeline/tasks/store.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/kuadrant/pipeline/tasks/headers.rs b/src/kuadrant/pipeline/tasks/headers.rs index e4077d43..5c16274b 100644 --- a/src/kuadrant/pipeline/tasks/headers.rs +++ b/src/kuadrant/pipeline/tasks/headers.rs @@ -111,7 +111,7 @@ impl Task for ModifyHeadersTask { let operation = match &self.mode { HeadersMode::Concrete { operation } => operation.clone(), HeadersMode::Deferred { headers_expr } => { - let mut cel_ctx = cel::Context::default(); + let mut cel_ctx = ctx.cel.new_ctx(&*self); match headers_expr.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { error!("Unexpected pending state in headers expression"); diff --git a/src/kuadrant/pipeline/tasks/send_reply.rs b/src/kuadrant/pipeline/tasks/send_reply.rs index b1b23721..5ac79a33 100644 --- a/src/kuadrant/pipeline/tasks/send_reply.rs +++ b/src/kuadrant/pipeline/tasks/send_reply.rs @@ -1,7 +1,5 @@ -use std::sync::Arc; - use cel::common::types::{CelString, CelUInt}; -use cel::{Env, Value}; +use cel::Value; use tracing::error; use crate::data::attribute::AttributeState; @@ -104,6 +102,10 @@ impl Task for SendReplyTask { &self.task_id } + fn cel_types(&self) -> Vec { + vec![deny_response_struct_def()] + } + #[tracing::instrument(name = "send_reply", skip(self, ctx))] fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some(ref predicate) = self.predicate { @@ -121,9 +123,7 @@ impl Task for SendReplyTask { } let (status_code, headers, body) = { - let mut env = Env::stdlib(); - env.add_struct(deny_response_struct_def()); - let mut cel_ctx = cel::Context::with_env(Arc::new(env)); + let mut cel_ctx = ctx.cel.new_ctx(&*self); match self.deny_with.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { error!("Unexpected pending state in deny expression"); diff --git a/src/kuadrant/pipeline/tasks/store.rs b/src/kuadrant/pipeline/tasks/store.rs index 8386975b..aa331981 100644 --- a/src/kuadrant/pipeline/tasks/store.rs +++ b/src/kuadrant/pipeline/tasks/store.rs @@ -156,7 +156,7 @@ impl Task for StoreTask { } } - let mut cel_ctx = cel::Context::default(); + let mut cel_ctx = ctx.cel.new_ctx(&*self); let value = match self.expression.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { return TaskOutcome::Requeued(vec![self]); From eda890db7159ec46f65cef644f27ee5bab038a04 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 15:43:27 +0100 Subject: [PATCH 07/12] Retain hierarchical bindings Signed-off-by: Adam Cattermole --- src/kuadrant/context.rs | 87 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/src/kuadrant/context.rs b/src/kuadrant/context.rs index 06a156da..95d04403 100644 --- a/src/kuadrant/context.rs +++ b/src/kuadrant/context.rs @@ -520,6 +520,7 @@ impl BodyContext { pub struct CelScope { env: Arc, registered: HashSet, + bindings: BTreeMap>, } impl Default for CelScope { @@ -527,6 +528,7 @@ impl Default for CelScope { Self { env: Arc::new(Env::stdlib()), registered: HashSet::new(), + bindings: BTreeMap::new(), } } } @@ -535,7 +537,7 @@ impl CelScope { pub fn new_ctx(&mut self, task: &dyn Task) -> Context<'static> { let task_id = task.id(); - if !self.registered.contains(task.id()) { + if !self.registered.contains(task_id) { let types = task.cel_types(); if !types.is_empty() { match Arc::get_mut(&mut self.env) { @@ -550,10 +552,29 @@ impl CelScope { self.registered.insert(task_id.to_string()); } - Context::with_env(Arc::clone(&self.env)) + let mut ctx = Context::with_env(Arc::clone(&self.env)); + for (scope_id, scope_bindings) in &self.bindings { + if is_ancestor(scope_id, task_id) { + for (name, value) in scope_bindings { + ctx.add_variable_from_value(name, value.clone()); + } + } + } + ctx + } + + pub fn add_scoped_binding(&mut self, task_id: &str, name: String, val: Value) { + self.bindings + .entry(task_id.to_string()) + .or_default() + .push((name, val)); } } +fn is_ancestor(scope_id: &str, task_id: &str) -> bool { + task_id == scope_id || task_id.starts_with(&format!("{}.", scope_id)) +} + #[cfg(test)] mod tests { use super::*; @@ -794,4 +815,66 @@ mod tests { Ok(AttributeState::Available(Some(ref s))) if s == "external-user-id" )); } + + #[test] + fn test_cel_scope_hierarchical_bindings() { + use crate::kuadrant::pipeline::tasks::{Task, TaskOutcome}; + + struct MockTask { + id: String, + } + impl Task for MockTask { + fn id(&self) -> &str { + &self.id + } + fn apply(self: Box, _ctx: &mut ReqRespCtx) -> TaskOutcome { + TaskOutcome::Done + } + } + + let mut scope = CelScope::default(); + + // Task "0" gets a binding + scope.add_scoped_binding( + "0", + "my_response".to_string(), + Value::String(Arc::new("user123".to_string())), + ); + + // Task "0.0" should see "my_response" from parent "0" + let task_0_0 = MockTask { + id: "0.0".to_string(), + }; + let ctx_0_0 = scope.new_ctx(&task_0_0); + assert!(ctx_0_0.get_variable("my_response").is_some()); + + // Task "1" should NOT see "my_response" (different branch) + let task_1 = MockTask { + id: "1".to_string(), + }; + let ctx_1 = scope.new_ctx(&task_1); + assert!(ctx_1.get_variable("my_response").is_none()); + } + + #[test] + fn test_is_ancestor() { + // Same task + assert!(is_ancestor("0", "0")); + + // Direct child + assert!(is_ancestor("0", "0.0")); + + // Nested child + assert!(is_ancestor("0", "0.0.1")); + + // Different branch + assert!(!is_ancestor("0", "1")); + assert!(!is_ancestor("0", "1.0")); + + // Sibling + assert!(!is_ancestor("0.0", "0.1")); + + // Parent relationship is not symmetric + assert!(!is_ancestor("0.0", "0")); + } } From 2394d94b6c48fc0d990f5c083976c24abcf012f8 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 16:11:37 +0100 Subject: [PATCH 08/12] on_reply actions are now Tasks Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/blueprint.rs | 230 ++++++++++++++----------- src/kuadrant/pipeline/tasks/dynamic.rs | 204 ++-------------------- 2 files changed, 140 insertions(+), 294 deletions(-) diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index 2725bcaf..85075519 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -60,6 +60,101 @@ pub(crate) enum Operation { } impl Action { + fn to_core_task(&self, ctx: &mut ReqRespCtx) -> Option> { + match &self.operation { + Operation::Grpc { + service, + var, + message_builder, + on_reply, + } => { + let children: Vec> = on_reply + .iter() + .filter_map(|a| a.to_core_task(ctx)) + .collect(); + + match service { + ServiceInstance::Dynamic(dynamic_service) + | ServiceInstance::Auth(dynamic_service) + | ServiceInstance::RateLimit(dynamic_service) + | ServiceInstance::RateLimitCheck(dynamic_service) + | ServiceInstance::RateLimitReport(dynamic_service) => { + Some(Box::new(DynamicTask::new_with_attributes( + ctx, + self.id.clone(), + Rc::clone(dynamic_service), + var.clone(), + message_builder.clone(), + children, + self.predicate.clone(), + self.dependencies.clone(), + self.is_guard, + ))) + } + ServiceInstance::Tracing(_) => { + ctx.set_public_tracker_id(var.clone()); + Some(Box::new(ModifyHeadersTask::new( + self.id.clone(), + HeaderOperation::Append( + vec![(var.clone(), ctx.request_id().to_string())].into(), + ), + HeadersType::HttpResponseHeaders, + ))) + } + } + } + Operation::Deny { deny_with } => { + use crate::kuadrant::pipeline::tasks::SendReplyTask; + Some(Box::new(SendReplyTask::new_deferred( + self.id.clone(), + self.predicate.clone(), + deny_with.clone(), + self.terminal, + ))) + } + Operation::Headers { + target, + headers: headers_expr, + } => Some(Box::new(ModifyHeadersTask::new_deferred( + self.id.clone(), + self.predicate.clone(), + headers_expr.clone(), + target.clone(), + self.terminal, + ))), + Operation::Store { + path, + expression, + export_to_host, + } => { + use crate::kuadrant::pipeline::tasks::StoreTask; + match StoreTask::new( + self.id.clone(), + self.predicate.clone(), + expression.clone(), + path.clone(), + *export_to_host, + self.terminal, + ) { + Ok(task) => Some(Box::new(task)), + Err(e) => { + tracing::error!( + "Failed to create StoreTask for path '{}': {}. Action {} will be skipped.", + path, + e, + self.id + ); + None + } + } + } + Operation::Fail { log_message } => { + tracing::error!("Fail operation: Action {}: {}", self.id, log_message); + None + } + } + } + pub fn collect_body_values(&self, request_data: &[RequestData]) -> Vec { use std::collections::HashSet; @@ -234,54 +329,33 @@ impl Blueprint { for action in &self.actions { match &action.operation { - Operation::Grpc { - service, - var, - message_builder, - on_reply, - } => { - let abort_on_failure = - service.failure_mode() == configuration::FailureMode::Deny; - - match service { - ServiceInstance::Tracing(tracing_service) => { - ctx.set_public_tracker_id(var.clone()); - tasks.push(Box::new(ModifyHeadersTask::new( - action.id.clone(), - HeaderOperation::Append( - vec![(var.clone(), ctx.request_id().to_string())].into(), - ), - HeadersType::HttpResponseHeaders, + Operation::Grpc { service, .. } => match service { + ServiceInstance::Tracing(tracing_service) => { + if let Some(task) = action.to_core_task(ctx) { + tasks.push(task); + } + if let Some(service) = tracing_service { + teardown_tasks + .push(Box::new(ExportTracesTask::new(ctx, service.clone()))); + } + } + ServiceInstance::Dynamic(_) + | ServiceInstance::Auth(_) + | ServiceInstance::RateLimit(_) + | ServiceInstance::RateLimitCheck(_) + | ServiceInstance::RateLimitReport(_) => { + let body_values = action.collect_body_values(request_data); + if !body_values.is_empty() { + tasks.push(Box::new(TokenUsageTask::with_expected_response_fields( + body_values, ))); - if let Some(service) = tracing_service { - teardown_tasks - .push(Box::new(ExportTracesTask::new(ctx, service.clone()))); - } } - ServiceInstance::Dynamic(dynamic_service) - | ServiceInstance::Auth(dynamic_service) - | ServiceInstance::RateLimit(dynamic_service) - | ServiceInstance::RateLimitCheck(dynamic_service) - | ServiceInstance::RateLimitReport(dynamic_service) => { - let body_values = action.collect_body_values(request_data); - if !body_values.is_empty() { - tasks.push(Box::new( - TokenUsageTask::with_expected_response_fields(body_values), - )); - } - let task: Box = Box::new(DynamicTask::new_with_attributes( - ctx, - action.id.clone(), - Rc::clone(dynamic_service), - var.clone(), - message_builder.clone(), - on_reply.clone(), - action.predicate.clone(), - action.dependencies.clone(), - action.is_guard, - )); - let task = Box::new(FailureModeTask::new(task, abort_on_failure)); + if let Some(mut task) = action.to_core_task(ctx) { + let abort_on_failure = + service.failure_mode() == configuration::FailureMode::Deny; + task = Box::new(FailureModeTask::new(task, abort_on_failure)); + if tracing_enabled { let span_label = match service { ServiceInstance::Auth(_) => "auth", @@ -290,72 +364,22 @@ impl Blueprint { ServiceInstance::RateLimitReport(_) => "ratelimit_report", _ => "dynamic", }; - tasks.push(Box::new(TracingDecoratorTask::new( + task = Box::new(TracingDecoratorTask::new( span_label, task, action.sources.clone(), - ))); - } else { - tasks.push(task); + )); } + + tasks.push(task); } } - } - Operation::Deny { deny_with } => { - use crate::kuadrant::pipeline::tasks::SendReplyTask; - let task = SendReplyTask::new_deferred( - action.id.clone(), - action.predicate.clone(), - deny_with.clone(), - action.terminal, - ); - tasks.push(Box::new(task)); - } - Operation::Headers { - target, - headers: headers_expr, - } => { - let task = ModifyHeadersTask::new_deferred( - action.id.clone(), - action.predicate.clone(), - headers_expr.clone(), - target.clone(), - action.terminal, - ); - tasks.push(Box::new(task)); - } - Operation::Store { - path, - expression, - export_to_host, - } => { - use crate::kuadrant::pipeline::tasks::StoreTask; - match StoreTask::new( - action.id.clone(), - action.predicate.clone(), - expression.clone(), - path.clone(), - *export_to_host, - action.terminal, - ) { - Ok(task) => tasks.push(Box::new(task)), - Err(e) => { - tracing::error!( - "Failed to create StoreTask for path '{}': {}. Action {} will be skipped.", - path, - e, - action.id - ); - } + }, + _ => { + if let Some(task) = action.to_core_task(ctx) { + tasks.push(task); } } - Operation::Fail { log_message } => { - tracing::error!( - "Top-level Fail operation is currently unsupported. Action {}: {}", - action.id, - log_message - ); - } } } diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index 2015cca1..5f46598a 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -1,27 +1,21 @@ use std::rc::Rc; -use cel::Value; use tracing::{debug, error}; use crate::data::attribute::AttributeState; use crate::data::cel::Predicate; use crate::data::Expression; -use crate::kuadrant::pipeline::blueprint::{Action, Operation}; -use crate::kuadrant::pipeline::tasks::{ - HeaderOperation, ModifyHeadersTask, PendingTask, SendReplyTask, Task, TaskOutcome, -}; +use crate::kuadrant::pipeline::tasks::{PendingTask, Task, TaskOutcome}; use crate::kuadrant::ReqRespCtx; use crate::record_error; -use crate::services::{ - cel_value_to_header_pairs, DescriptorConverter, DynamicService, MessageConverter, -}; +use crate::services::{DescriptorConverter, DynamicService}; pub struct DynamicTask { task_id: String, service: Rc, name: String, message_builder: Expression, - on_reply: Vec, + on_reply: Vec>, predicate: Predicate, dependencies: Vec, is_guard: bool, @@ -35,7 +29,7 @@ impl DynamicTask { service: Rc, name: String, message_builder: Expression, - on_reply: Vec, + on_reply: Vec>, predicate: Predicate, dependencies: Vec, is_guard: bool, @@ -59,32 +53,6 @@ impl DynamicTask { let _ = self.predicate.test(ctx); let mut cel_ctx = ctx.cel.new_ctx(self); let _ = self.message_builder.eval(ctx, &mut cel_ctx); - - for action in &self.on_reply { - let _ = action.predicate.test_with_ctx(ctx, &mut cel_ctx); - match &action.operation { - Operation::Grpc { - message_builder, - on_reply: nested_on_reply, - .. - } => { - let _ = message_builder.eval(ctx, &mut cel_ctx); - for nested_action in nested_on_reply { - let _ = nested_action.predicate.test_with_ctx(ctx, &mut cel_ctx); - } - } - Operation::Deny { deny_with } => { - let _ = deny_with.eval(ctx, &mut cel_ctx); - } - Operation::Headers { headers, .. } => { - let _ = headers.eval(ctx, &mut cel_ctx); - } - Operation::Store { expression, .. } => { - let _ = expression.eval(ctx, &mut cel_ctx); - } - Operation::Fail { .. } => {} - } - } } } @@ -165,7 +133,7 @@ impl Task for DynamicTask { let service = self.service.clone(); let task_id = self.task_id.clone(); let name = self.name.clone(); - let on_reply = self.on_reply.clone(); + let on_reply = self.on_reply; let is_guard = self.is_guard; if is_guard { @@ -178,7 +146,7 @@ impl Task for DynamicTask { task_id.clone(), Box::new(move |ctx| { let outcome = process_dynamic_response( - ctx, &service, &task_id, token_id, &name, &on_reply, + ctx, &service, &task_id, token_id, &name, on_reply, ); if is_guard { ctx.barrier.lower(); @@ -193,11 +161,11 @@ impl Task for DynamicTask { fn process_dynamic_response( ctx: &mut ReqRespCtx, - service: &DynamicService, + _service: &DynamicService, task_id: &str, token_id: u32, - name: &str, - on_reply: &[Action], + _name: &str, + on_reply: Vec>, ) -> TaskOutcome { let span = tracing::debug_span!( "dynamic_response", @@ -209,7 +177,7 @@ fn process_dynamic_response( ) .entered(); - let (status_code, response_size) = match ctx.get_grpc_response_data() { + let (status_code, _response_size) = match ctx.get_grpc_response_data() { Ok(data) => data, Err(e) => { record_error!("Failed to get gRPC response: {e:?}"); @@ -224,156 +192,10 @@ fn process_dynamic_response( } if on_reply.is_empty() { - debug!("No onReply actions, completing"); + debug!("No onReply tasks, completing"); return TaskOutcome::Done; } - let mut cel_ctx = match service.response_cel_context(ctx, response_size, name) { - Ok(c) => c, - Err(e) => { - record_error!("Failed to build response context: {e:?}"); - return TaskOutcome::Failed; - } - }; - - let mut tasks: Vec> = Vec::new(); - - for action in on_reply { - match action.predicate.test_with_ctx(ctx, &mut cel_ctx) { - Ok(AttributeState::Available(true)) => {} - Ok(AttributeState::Available(false)) => continue, - Ok(AttributeState::Pending) => { - //todo(@adam-cattermole): if we requeue here, we lose predicates as headers/store/sendreply are not modelled with predicates - } - Err(e) => { - error!("Failed to apply predicates: {e:?}"); - return TaskOutcome::Failed; - } - } - - match &action.operation { - Operation::Deny { deny_with } => match deny_with.eval(ctx, &mut cel_ctx) { - Ok(AttributeState::Pending) => { - error!("Unexpected pending state in onReply deny"); - return TaskOutcome::Failed; - } - Ok(AttributeState::Available(val @ Value::Struct(_))) => { - match SendReplyTask::try_from(val) { - Ok(task) => { - if action.terminal { - return TaskOutcome::Terminate(Box::new(task)); - } - tasks.push(Box::new(task)); - } - Err(e) => { - error!("Invalid DenyResponse: {e}"); - return TaskOutcome::Failed; - } - } - } - Ok(AttributeState::Available(other)) => { - error!("denyWith must return DenyResponse, got: {other:?}"); - return TaskOutcome::Failed; - } - Err(e) => { - error!("Failed to evaluate denyWith expression: {e}"); - return TaskOutcome::Failed; - } - }, - Operation::Headers { target, headers } => match headers.eval(ctx, &mut cel_ctx) { - Ok(AttributeState::Available(ref val)) => { - let pairs = cel_value_to_header_pairs(val); - if !pairs.is_empty() { - tasks.push(Box::new(ModifyHeadersTask::new( - "inline_response_header".to_string(), - HeaderOperation::Set(pairs.into()), - target.clone(), - ))); - } - } - Ok(AttributeState::Pending) => { - error!("Unexpected pending state in onReply headers"); - return TaskOutcome::Failed; - } - Err(e) => { - error!("Failed to evaluate headers expression: {e}"); - return TaskOutcome::Failed; - } - }, - Operation::Store { - path, - expression, - export_to_host, - } => match expression.eval(ctx, &mut cel_ctx) { - // todo(@adam-cattermole): this should be delegated to the StoreTask - Ok(AttributeState::Available(val)) => { - if *export_to_host { - match MessageConverter::cel_value_to_bytes(&val) { - Ok(bytes) => { - if let Err(e) = ctx.set_attribute(path, &bytes) { - error!("Failed to store attribute {path}: {e:?}"); - return TaskOutcome::Failed; - } - } - Err(e) => { - error!("Failed to convert value to bytes for '{path}': {e}"); - return TaskOutcome::Failed; - } - } - } - ctx.store_value(path.clone(), val); - } - Ok(AttributeState::Pending) => { - error!("Unexpected pending state in onReply store for '{path}'"); - return TaskOutcome::Failed; - } - Err(e) => { - error!("Failed to evaluate store expression for '{path}': {e}"); - return TaskOutcome::Failed; - } - }, - Operation::Fail { log_message } => { - error!("Action failure: {log_message}"); - return TaskOutcome::Failed; - } - Operation::Grpc { - service, - var, - message_builder, - on_reply: nested_on_reply, - } => match service { - crate::services::ServiceInstance::Dynamic(dynamic_service) - | crate::services::ServiceInstance::Auth(dynamic_service) - | crate::services::ServiceInstance::RateLimit(dynamic_service) - | crate::services::ServiceInstance::RateLimitCheck(dynamic_service) - | crate::services::ServiceInstance::RateLimitReport(dynamic_service) => { - let task = Box::new(DynamicTask::new_with_attributes( - ctx, - action.id.clone(), - Rc::clone(dynamic_service), - var.clone(), - message_builder.clone(), - nested_on_reply.clone(), - action.predicate.clone(), - action.dependencies.clone(), - action.is_guard, - )); - if action.terminal { - return TaskOutcome::Terminate(task); - } - tasks.push(task); - } - _ => { - error!("Unsupported service type for nested gRPC operation"); - return TaskOutcome::Failed; - } - }, - } - } - - if tasks.is_empty() { - TaskOutcome::Done - } else { - TaskOutcome::Requeued(tasks) - } + // todo(@adam-cattermole): Add response variable binding here with ctx.cel.add_scoped_binding() + TaskOutcome::Requeued(on_reply) } From 73ae4843790003b086e9a4fcb118b0dac8cedcf8 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 16:43:52 +0100 Subject: [PATCH 09/12] Add response var binding to context Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/tasks/dynamic.rs | 44 ++++++++++++++++--- src/services/dynamic.rs | 60 ++++++++------------------ src/services/dynamic/converters.rs | 43 +++++++++--------- 3 files changed, 79 insertions(+), 68 deletions(-) diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index 5f46598a..d1688d2d 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -161,10 +161,10 @@ impl Task for DynamicTask { fn process_dynamic_response( ctx: &mut ReqRespCtx, - _service: &DynamicService, + service: &DynamicService, task_id: &str, token_id: u32, - _name: &str, + name: &str, on_reply: Vec>, ) -> TaskOutcome { let span = tracing::debug_span!( @@ -177,7 +177,7 @@ fn process_dynamic_response( ) .entered(); - let (status_code, _response_size) = match ctx.get_grpc_response_data() { + let (status_code, response_size) = match ctx.get_grpc_response_data() { Ok(data) => data, Err(e) => { record_error!("Failed to get gRPC response: {e:?}"); @@ -196,6 +196,40 @@ fn process_dynamic_response( return TaskOutcome::Done; } - // todo(@adam-cattermole): Add response variable binding here with ctx.cel.add_scoped_binding() - TaskOutcome::Requeued(on_reply) + let cel_value = match service.get_response_cel_value(ctx, response_size) { + Ok(val) => val, + Err(e) => { + error!("Failed to get response CEL value: {e}"); + return TaskOutcome::Failed; + } + }; + + ctx.cel + .add_scoped_binding(task_id, name.to_string(), cel_value); + + let mut remaining_tasks: Vec> = Vec::new(); + for child in on_reply { + match child.apply(ctx) { + TaskOutcome::Done => {} + TaskOutcome::Terminate(t) => return TaskOutcome::Terminate(t), + TaskOutcome::Deferred { + token_id: _, + pending: _, + } => { + todo!("(@adam-cattermole): this is new and yet to be supported - deferred tasks generating deferred tasks") + } + TaskOutcome::Requeued(mut requeued) => { + remaining_tasks.append(&mut requeued); + } + TaskOutcome::Failed => { + return TaskOutcome::Failed; + } + } + } + + if remaining_tasks.is_empty() { + TaskOutcome::Done + } else { + TaskOutcome::Requeued(remaining_tasks) + } } diff --git a/src/services/dynamic.rs b/src/services/dynamic.rs index c661c592..4d85bcae 100644 --- a/src/services/dynamic.rs +++ b/src/services/dynamic.rs @@ -1,9 +1,7 @@ -use std::cell::OnceCell; use std::rc::Rc; -use std::sync::Arc; use std::time::Duration; -use cel::{Context, Env, Value}; +use cel::Value; use prost::Message; use prost_reflect::DynamicMessage; use tracing::debug; @@ -15,7 +13,7 @@ use crate::kuadrant::ReqRespCtx; pub mod converters; -use converters::{deny_response_struct_def, DescriptorConverter, MessageConverter}; +use converters::MessageConverter; pub struct DynamicService { upstream_name: String, @@ -24,7 +22,6 @@ pub struct DynamicService { timeout: Duration, failure_mode: FailureMode, descriptor_manager: Rc, - cel_env: OnceCell>, } impl DynamicService { @@ -45,7 +42,6 @@ impl DynamicService { timeout, failure_mode, descriptor_manager, - cel_env: Default::default(), } } @@ -53,27 +49,6 @@ impl DynamicService { self.failure_mode } - pub fn cel_env(&self) -> Result, ServiceError> { - match self.cel_env.get() { - Some(env) => Ok(Arc::clone(env)), - None => { - let input_descriptor = self.input_descriptor()?; - let output_descriptor = self.output_descriptor()?; - let mut env = Env::stdlib(); - DescriptorConverter::register_message_types(&mut env, &input_descriptor).map_err( - |e| ServiceError::Dispatch(format!("Failed to register message types: {}", e)), - )?; - DescriptorConverter::register_message_types(&mut env, &output_descriptor).map_err( - |e| ServiceError::Dispatch(format!("Failed to register message types: {}", e)), - )?; - env.add_struct(deny_response_struct_def()); - let env_arc = Arc::new(env); - let _ = self.cel_env.set(Arc::clone(&env_arc)); - Ok(env_arc) - } - } - } - pub fn dispatch_value( &self, ctx: &mut ReqRespCtx, @@ -136,21 +111,14 @@ impl DynamicService { Ok(self.method_descriptor()?.output()) } - pub fn response_cel_context( + pub fn get_response_cel_value( &self, ctx: &mut ReqRespCtx, response_size: usize, - name: &str, - ) -> Result, ServiceError> { + ) -> Result { let response = self.get_response(ctx, response_size)?; - let cel_value = MessageConverter::dynamic_message_to_cel(&response).map_err(|e| { - ServiceError::Decode(format!("Failed to convert message to CEL: {}", e)) - })?; - let env = self.cel_env()?; - - let mut cel_ctx = Context::with_env(env); - cel_ctx.add_variable_from_value(name, cel_value); - Ok(cel_ctx) + MessageConverter::dynamic_message_to_cel(&response) + .map_err(|e| ServiceError::Decode(format!("Failed to convert message to CEL: {}", e))) } } @@ -190,9 +158,14 @@ impl Service for DynamicService { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; - use crate::filter::{DescriptorKey, DescriptorManager}; - use cel::Program; + use crate::{ + filter::{DescriptorKey, DescriptorManager}, + services::DescriptorConverter, + }; + use cel::{Context, Env, Program}; use prost_reflect::DescriptorPool; use prost_types::{ field_descriptor_proto, DescriptorProto, FieldDescriptorProto, FileDescriptorProto, @@ -279,8 +252,11 @@ mod tests { let input_desc = method_desc.input(); let mut env = Env::stdlib(); - DescriptorConverter::register_message_types(&mut env, &input_desc) - .expect("Failed to register types"); + for def in DescriptorConverter::collect_struct_defs(&input_desc) + .expect("Failed to collect struct defs") + { + env.add_struct(def); + } let cel_ctx = Context::with_env(Arc::new(env)); let program = Program::compile(cel_expression).expect("Failed to compile"); diff --git a/src/services/dynamic/converters.rs b/src/services/dynamic/converters.rs index 398191ec..be0819ed 100644 --- a/src/services/dynamic/converters.rs +++ b/src/services/dynamic/converters.rs @@ -1,6 +1,6 @@ use cel::common::types::*; use cel::objects::Key; -use cel::{Env, StructDef, Value}; +use cel::{StructDef, Value}; use prost_reflect::Cardinality; use prost_reflect::{ DynamicMessage, FieldDescriptor, Kind as ProtoKind, MapKey, MessageDescriptor, ReflectMessage, @@ -94,17 +94,6 @@ impl DescriptorConverter { Ok(defs) } - pub fn register_message_types( - env: &mut Env, - descriptor: &MessageDescriptor, - ) -> Result<(), ConversionError> { - let defs = Self::collect_struct_defs(descriptor)?; - for def in defs { - env.add_struct(def); - } - Ok(()) - } - /// Convert a protobuf MessageDescriptor to a CEL StructDef pub fn to_struct_def(descriptor: &MessageDescriptor) -> Result { let mut struct_def = StructDef::new(descriptor.full_name().to_string()); @@ -891,7 +880,7 @@ impl MessageConverter { mod tests { use super::*; use cel::common::value::Val; - use cel::{Context, Program}; + use cel::{Context, Env, Program}; use prost::Message; use prost_types::{field_descriptor_proto, DescriptorProto, FieldDescriptorProto}; use prost_types::{FileDescriptorProto, FileDescriptorSet, OneofDescriptorProto}; @@ -1052,8 +1041,11 @@ mod tests { // Register all message types let mut env = cel::Env::stdlib(); - DescriptorConverter::register_message_types(&mut env, &outer_descriptor) - .expect("Failed to register types"); + for def in DescriptorConverter::collect_struct_defs(&outer_descriptor) + .expect("Failed to collect struct defs") + { + env.add_struct(def); + } let ctx = Context::with_env(Arc::new(env)); @@ -1455,8 +1447,11 @@ mod tests { .expect("MapMessage not found"); let mut env = cel::Env::stdlib(); - DescriptorConverter::register_message_types(&mut env, &descriptor) - .expect("Failed to register types"); + for def in DescriptorConverter::collect_struct_defs(&descriptor) + .expect("Failed to collect struct defs") + { + env.add_struct(def); + } let ctx = Context::with_env(Arc::new(env)); @@ -1897,10 +1892,16 @@ mod tests { .expect("Failed to get Request descriptor"); let mut env = Env::default(); - DescriptorConverter::register_message_types(&mut env, ×tamp_desc) - .expect("Failed to register Timestamp"); - DescriptorConverter::register_message_types(&mut env, &request_desc) - .expect("Failed to register Request"); + for def in DescriptorConverter::collect_struct_defs(×tamp_desc) + .expect("Failed to collect struct defs") + { + env.add_struct(def); + } + for def in DescriptorConverter::collect_struct_defs(&request_desc) + .expect("Failed to collect struct defs") + { + env.add_struct(def); + } // Create a CEL timestamp: 2024-05-16 12:00:00 UTC (1715875200 seconds, 123456789 nanos) let dt: DateTime = DateTime::from_timestamp(1715875200, 123456789) From 66e05aa0464b5187b2737dd973898712f731e0fb Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 16:57:44 +0100 Subject: [PATCH 10/12] Ensure predicates test use correct cel::Context Signed-off-by: Adam Cattermole --- src/data/cel.rs | 74 +++++++++++++++-------- src/kuadrant/pipeline/blueprint.rs | 4 +- src/kuadrant/pipeline/tasks/dynamic.rs | 5 +- src/kuadrant/pipeline/tasks/headers.rs | 3 +- src/kuadrant/pipeline/tasks/send_reply.rs | 3 +- src/kuadrant/pipeline/tasks/store.rs | 6 +- 6 files changed, 60 insertions(+), 35 deletions(-) diff --git a/src/data/cel.rs b/src/data/cel.rs index 5880cc01..35add1ad 100644 --- a/src/data/cel.rs +++ b/src/data/cel.rs @@ -534,16 +534,7 @@ impl Predicate { }) } - pub fn test(&self, req_ctx: &ReqRespCtx) -> PredicateResult { - let mut cel_ctx = Context::default(); - self.test_with_ctx(req_ctx, &mut cel_ctx) - } - - pub fn test_with_ctx( - &self, - req_ctx: &ReqRespCtx, - cel_ctx: &mut Context<'_>, - ) -> PredicateResult { + pub fn test(&self, req_ctx: &ReqRespCtx, cel_ctx: &mut Context<'_>) -> PredicateResult { match self.expression.eval(req_ctx, cel_ctx) { Ok(AttributeState::Pending) => Ok(AttributeState::Pending), Ok(AttributeState::Available(value)) => match value { @@ -592,8 +583,9 @@ impl PredicateVec for Vec { .collect(); req_ctx.ensure_attributes(&paths); + let mut cel_ctx = Context::default(); for predicate in self.iter() { - match predicate.test(req_ctx)? { + match predicate.test(req_ctx, &mut cel_ctx)? { AttributeState::Pending => { return Ok(AttributeState::Pending); } @@ -1091,7 +1083,7 @@ mod tests { use crate::kuadrant::MockWasmHost; use crate::kuadrant::ReqRespCtx; use cel::objects::ValueType; - use cel::Value; + use cel::{Context, Value}; use std::collections::HashMap; use std::sync::Arc; @@ -1102,7 +1094,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("source.port == 65432").expect("This is valid CEL!"); assert_eq!( - predicate.test(&ctx).expect("This must evaluate properly!"), + predicate + .test(&ctx, &mut Context::default()) + .expect("This must evaluate properly!"), AttributeState::Available(true) ); } @@ -1239,7 +1233,9 @@ mod tests { ) .expect("This is valid!"); assert_eq!( - predicate.test(&ctx).expect("This must evaluate properly!"), + predicate + .test(&ctx, &mut Context::default()) + .expect("This must evaluate properly!"), AttributeState::Available(true) ); @@ -1253,7 +1249,9 @@ mod tests { ) .expect("This is valid!"); assert_eq!( - predicate.test(&ctx).expect("This must evaluate properly!"), + predicate + .test(&ctx, &mut Context::default()) + .expect("This must evaluate properly!"), AttributeState::Available(true) ); @@ -1263,7 +1261,9 @@ mod tests { let predicate = Predicate::route_rule("queryMap(request.query) == {'👾': ''}").expect("This is valid!"); assert_eq!( - predicate.test(&ctx).expect("This must evaluate properly!"), + predicate + .test(&ctx, &mut Context::default()) + .expect("This must evaluate properly!"), AttributeState::Available(true) ); } @@ -1394,7 +1394,10 @@ mod tests { "'👾' in queryMap(request.query) ? queryMap(request.query)['👾'] == '123' : false", ) .expect("This is valid!"); - assert_eq!(predicate.test(&ctx), Ok(AttributeState::Available(true))); + assert_eq!( + predicate.test(&ctx, &mut Context::default()), + Ok(AttributeState::Available(true)) + ); let headers = vec![ ("X-Auth".to_string(), "kuadrant".to_string()), @@ -1404,7 +1407,10 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::route_rule("request.headers.exists(h, h.lowerAscii() == 'x-auth' && request.headers[h] == 'kuadrant')").expect("This is valid!"); - assert_eq!(predicate.test(&ctx), Ok(AttributeState::Available(true))); + assert_eq!( + predicate.test(&ctx, &mut Context::default()), + Ok(AttributeState::Available(true)) + ); } #[test] @@ -1474,7 +1480,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("source.port == 65432").expect("This is valid CEL!"); - let result = predicate.test(&ctx).expect("Test should succeed"); + let result = predicate + .test(&ctx, &mut Context::default()) + .expect("Test should succeed"); assert_eq!(result, AttributeState::Pending); } @@ -1603,7 +1611,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("request.grpc.service == 'UserService'").expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(true) ); } @@ -1623,7 +1633,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("request.grpc.method == 'GetUser'").expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(true) ); } @@ -1640,7 +1652,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("has(request.grpc)").expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(true) ); } @@ -1654,7 +1668,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("has(request.grpc)").expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(false) ); } @@ -1668,7 +1684,9 @@ mod tests { let ctx = ReqRespCtx::new(Arc::new(mock_host)); let predicate = Predicate::new("has(request.grpc)").expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(false) ); } @@ -1687,7 +1705,9 @@ mod tests { Predicate::new("has(request.grpc) && request.grpc.service == 'UserService'") .expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(true) ); } @@ -1703,7 +1723,9 @@ mod tests { Predicate::new("has(request.grpc) && request.grpc.service == 'UserService'") .expect("valid CEL"); assert_eq!( - predicate.test(&ctx).expect("must evaluate"), + predicate + .test(&ctx, &mut Context::default()) + .expect("must evaluate"), AttributeState::Available(false) ); } diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index 85075519..2af41d7d 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -148,8 +148,8 @@ impl Action { } } } - Operation::Fail { log_message } => { - tracing::error!("Fail operation: Action {}: {}", self.id, log_message); + Operation::Fail { log_message: _ } => { + // todo(@adam-cattermole): Do something with the failure operation None } } diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index d1688d2d..2a916e2c 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -50,8 +50,8 @@ impl DynamicTask { } fn warm(&self, ctx: &mut ReqRespCtx) { - let _ = self.predicate.test(ctx); let mut cel_ctx = ctx.cel.new_ctx(self); + let _ = self.predicate.test(ctx, &mut cel_ctx); let _ = self.message_builder.eval(ctx, &mut cel_ctx); } } @@ -84,7 +84,8 @@ impl Task for DynamicTask { } fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { - match self.predicate.test(ctx) { + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match self.predicate.test(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { return if ctx.response_body.is_end_of_stream() { TaskOutcome::Failed diff --git a/src/kuadrant/pipeline/tasks/headers.rs b/src/kuadrant/pipeline/tasks/headers.rs index 5c16274b..16bf0103 100644 --- a/src/kuadrant/pipeline/tasks/headers.rs +++ b/src/kuadrant/pipeline/tasks/headers.rs @@ -95,7 +95,8 @@ impl Task for ModifyHeadersTask { fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some(ref predicate) = self.predicate { - match predicate.test(ctx) { + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match predicate.test(ctx, &mut cel_ctx) { Ok(AttributeState::Available(true)) => {} Ok(AttributeState::Available(false)) => return TaskOutcome::Done, Ok(AttributeState::Pending) => { diff --git a/src/kuadrant/pipeline/tasks/send_reply.rs b/src/kuadrant/pipeline/tasks/send_reply.rs index 5ac79a33..11209c25 100644 --- a/src/kuadrant/pipeline/tasks/send_reply.rs +++ b/src/kuadrant/pipeline/tasks/send_reply.rs @@ -109,7 +109,8 @@ impl Task for SendReplyTask { #[tracing::instrument(name = "send_reply", skip(self, ctx))] fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { if let Some(ref predicate) = self.predicate { - match predicate.test(ctx) { + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match predicate.test(ctx, &mut cel_ctx) { Ok(AttributeState::Available(true)) => {} Ok(AttributeState::Available(false)) => return TaskOutcome::Done, Ok(AttributeState::Pending) => { diff --git a/src/kuadrant/pipeline/tasks/store.rs b/src/kuadrant/pipeline/tasks/store.rs index aa331981..9547eb9f 100644 --- a/src/kuadrant/pipeline/tasks/store.rs +++ b/src/kuadrant/pipeline/tasks/store.rs @@ -142,8 +142,10 @@ impl Task for StoreTask { parser.populate(body_ctx_mut); } + let mut cel_ctx = ctx.cel.new_ctx(&*self); + if let Some(ref predicate) = self.predicate { - match predicate.test(ctx) { + match predicate.test(ctx, &mut cel_ctx) { Ok(AttributeState::Available(true)) => {} Ok(AttributeState::Available(false)) => return TaskOutcome::Done, Ok(AttributeState::Pending) => { @@ -155,8 +157,6 @@ impl Task for StoreTask { } } } - - let mut cel_ctx = ctx.cel.new_ctx(&*self); let value = match self.expression.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { return TaskOutcome::Requeued(vec![self]); From 1ff92da0ba4b3f0d9fbf18c57332d5bbab54fc16 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 17:07:47 +0100 Subject: [PATCH 11/12] Add FailTask Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/blueprint.rs | 11 ++++-- src/kuadrant/pipeline/tasks/fail.rs | 58 +++++++++++++++++++++++++++++ src/kuadrant/pipeline/tasks/mod.rs | 2 + 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 src/kuadrant/pipeline/tasks/fail.rs diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index 2af41d7d..865519a4 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -148,9 +148,14 @@ impl Action { } } } - Operation::Fail { log_message: _ } => { - // todo(@adam-cattermole): Do something with the failure operation - None + Operation::Fail { log_message } => { + use crate::kuadrant::pipeline::tasks::FailTask; + Some(Box::new(FailTask::new( + self.id.clone(), + self.predicate.clone(), + log_message.clone(), + self.terminal, + ))) } } } diff --git a/src/kuadrant/pipeline/tasks/fail.rs b/src/kuadrant/pipeline/tasks/fail.rs new file mode 100644 index 00000000..2870f294 --- /dev/null +++ b/src/kuadrant/pipeline/tasks/fail.rs @@ -0,0 +1,58 @@ +use tracing::error; + +use crate::data::attribute::AttributeState; +use crate::data::cel::Predicate; +use crate::kuadrant::pipeline::tasks::{SendReplyTask, Task, TaskOutcome}; +use crate::kuadrant::ReqRespCtx; +use crate::metrics::METRICS; + +pub struct FailTask { + task_id: String, + predicate: Predicate, + log_message: String, + terminal: bool, +} + +impl FailTask { + pub fn new(task_id: String, predicate: Predicate, log_message: String, terminal: bool) -> Self { + Self { + task_id, + predicate, + log_message, + terminal, + } + } +} + +impl Task for FailTask { + fn id(&self) -> &str { + &self.task_id + } + + fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match self.predicate.test(ctx, &mut cel_ctx) { + Ok(AttributeState::Available(true)) => { + error!("Action failure: {}", self.log_message); + if self.terminal { + METRICS.errors().increment(); + TaskOutcome::Terminate(Box::new(SendReplyTask::default())) + } else { + TaskOutcome::Done + } + } + Ok(AttributeState::Available(false)) => TaskOutcome::Done, + Ok(AttributeState::Pending) => { + if ctx.response_body.is_end_of_stream() { + TaskOutcome::Failed + } else { + TaskOutcome::Requeued(vec![self]) + } + } + Err(e) => { + error!("Failed to evaluate log task predicate: {e:?}"); + TaskOutcome::Failed + } + } + } +} diff --git a/src/kuadrant/pipeline/tasks/mod.rs b/src/kuadrant/pipeline/tasks/mod.rs index 488b5562..5135688b 100644 --- a/src/kuadrant/pipeline/tasks/mod.rs +++ b/src/kuadrant/pipeline/tasks/mod.rs @@ -1,5 +1,6 @@ mod dynamic; mod export_traces; +mod fail; mod failure_mode; mod headers; mod send_reply; @@ -9,6 +10,7 @@ mod tracing_decorator; pub use dynamic::DynamicTask; pub use export_traces::ExportTracesTask; +pub use fail::FailTask; pub use failure_mode::FailureModeTask; pub use headers::{HeaderOperation, HeadersType, ModifyHeadersTask}; pub use send_reply::SendReplyTask; From 49318aeedd1d183c9663c43237c6e3522c26ae27 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 9 Jun 2026 17:55:57 +0100 Subject: [PATCH 12/12] Unify send and headers to have single CEL based impl Signed-off-by: Adam Cattermole --- src/kuadrant/pipeline/blueprint.rs | 20 ++-- src/kuadrant/pipeline/tasks/dynamic.rs | 26 +---- src/kuadrant/pipeline/tasks/headers.rs | 132 ++++++++-------------- src/kuadrant/pipeline/tasks/mod.rs | 2 +- src/kuadrant/pipeline/tasks/send_reply.rs | 125 +++++++------------- 5 files changed, 102 insertions(+), 203 deletions(-) diff --git a/src/kuadrant/pipeline/blueprint.rs b/src/kuadrant/pipeline/blueprint.rs index 865519a4..6c13815b 100644 --- a/src/kuadrant/pipeline/blueprint.rs +++ b/src/kuadrant/pipeline/blueprint.rs @@ -5,8 +5,8 @@ use crate::configuration::{ }; use crate::data::{cel::Predicate, Expression}; use crate::kuadrant::pipeline::tasks::{ - DynamicTask, ExportTracesTask, FailureModeTask, HeaderOperation, HeadersType, - ModifyHeadersTask, Task, TeardownAction, TokenUsageTask, TracingDecoratorTask, + DynamicTask, ExportTracesTask, FailureModeTask, HeadersType, ModifyHeadersTask, Task, + TeardownAction, TokenUsageTask, TracingDecoratorTask, }; use crate::kuadrant::ReqRespCtx; use crate::services::ServiceInstance; @@ -93,19 +93,25 @@ impl Action { } ServiceInstance::Tracing(_) => { ctx.set_public_tracker_id(var.clone()); + #[allow(clippy::expect_used)] + let predicate = Predicate::new("true").expect("Needs to be valid!"); + #[allow(clippy::expect_used)] + let headers_expr = + Expression::new(&format!("[['{var}', '{}']]", ctx.request_id())) + .expect("Needs to be valid CEL!"); Some(Box::new(ModifyHeadersTask::new( self.id.clone(), - HeaderOperation::Append( - vec![(var.clone(), ctx.request_id().to_string())].into(), - ), + predicate, + headers_expr, HeadersType::HttpResponseHeaders, + false, ))) } } } Operation::Deny { deny_with } => { use crate::kuadrant::pipeline::tasks::SendReplyTask; - Some(Box::new(SendReplyTask::new_deferred( + Some(Box::new(SendReplyTask::new( self.id.clone(), self.predicate.clone(), deny_with.clone(), @@ -115,7 +121,7 @@ impl Action { Operation::Headers { target, headers: headers_expr, - } => Some(Box::new(ModifyHeadersTask::new_deferred( + } => Some(Box::new(ModifyHeadersTask::new( self.id.clone(), self.predicate.clone(), headers_expr.clone(), diff --git a/src/kuadrant/pipeline/tasks/dynamic.rs b/src/kuadrant/pipeline/tasks/dynamic.rs index 2a916e2c..0b6e6694 100644 --- a/src/kuadrant/pipeline/tasks/dynamic.rs +++ b/src/kuadrant/pipeline/tasks/dynamic.rs @@ -208,29 +208,5 @@ fn process_dynamic_response( ctx.cel .add_scoped_binding(task_id, name.to_string(), cel_value); - let mut remaining_tasks: Vec> = Vec::new(); - for child in on_reply { - match child.apply(ctx) { - TaskOutcome::Done => {} - TaskOutcome::Terminate(t) => return TaskOutcome::Terminate(t), - TaskOutcome::Deferred { - token_id: _, - pending: _, - } => { - todo!("(@adam-cattermole): this is new and yet to be supported - deferred tasks generating deferred tasks") - } - TaskOutcome::Requeued(mut requeued) => { - remaining_tasks.append(&mut requeued); - } - TaskOutcome::Failed => { - return TaskOutcome::Failed; - } - } - } - - if remaining_tasks.is_empty() { - TaskOutcome::Done - } else { - TaskOutcome::Requeued(remaining_tasks) - } + TaskOutcome::Requeued(on_reply) } diff --git a/src/kuadrant/pipeline/tasks/headers.rs b/src/kuadrant/pipeline/tasks/headers.rs index 16bf0103..bf9f13b6 100644 --- a/src/kuadrant/pipeline/tasks/headers.rs +++ b/src/kuadrant/pipeline/tasks/headers.rs @@ -29,49 +29,17 @@ impl From<&HeadersType> for Path { } } -enum HeadersMode { - Concrete { operation: HeaderOperation }, - Deferred { headers_expr: Expression }, -} - #[derive(Clone)] pub struct ModifyHeadersTask { task_id: String, - predicate: Option, - mode: HeadersMode, + predicate: Predicate, + headers_expr: Expression, target: HeadersType, terminal: bool, } -impl Clone for HeadersMode { - fn clone(&self) -> Self { - match self { - HeadersMode::Concrete { operation } => HeadersMode::Concrete { - operation: operation.clone(), - }, - HeadersMode::Deferred { headers_expr } => HeadersMode::Deferred { - headers_expr: headers_expr.clone(), - }, - } - } -} - impl ModifyHeadersTask { pub fn new( - task_id: String, - operation: HeaderOperation, - target: HeadersType, - ) -> ModifyHeadersTask { - ModifyHeadersTask { - task_id, - predicate: None, - mode: HeadersMode::Concrete { operation }, - target, - terminal: false, - } - } - - pub fn new_deferred( task_id: String, predicate: Predicate, headers_expr: Expression, @@ -80,8 +48,8 @@ impl ModifyHeadersTask { ) -> Self { Self { task_id, - predicate: Some(predicate), - mode: HeadersMode::Deferred { headers_expr }, + predicate, + headers_expr, target, terminal, } @@ -94,42 +62,34 @@ impl Task for ModifyHeadersTask { } fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { - if let Some(ref predicate) = self.predicate { - let mut cel_ctx = ctx.cel.new_ctx(&*self); - match predicate.test(ctx, &mut cel_ctx) { - Ok(AttributeState::Available(true)) => {} - Ok(AttributeState::Available(false)) => return TaskOutcome::Done, - Ok(AttributeState::Pending) => { - return TaskOutcome::Requeued(vec![self]); - } - Err(e) => { - error!("Failed to evaluate predicate: {e:?}"); - return TaskOutcome::Failed; - } + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match self.predicate.test(ctx, &mut cel_ctx) { + Ok(AttributeState::Available(true)) => {} + Ok(AttributeState::Available(false)) => return TaskOutcome::Done, + Ok(AttributeState::Pending) => { + return TaskOutcome::Requeued(vec![self]); + } + Err(e) => { + error!("Failed to evaluate predicate: {e:?}"); + return TaskOutcome::Failed; } } - let operation = match &self.mode { - HeadersMode::Concrete { operation } => operation.clone(), - HeadersMode::Deferred { headers_expr } => { - let mut cel_ctx = ctx.cel.new_ctx(&*self); - match headers_expr.eval(ctx, &mut cel_ctx) { - Ok(AttributeState::Pending) => { - error!("Unexpected pending state in headers expression"); - return TaskOutcome::Failed; - } - Ok(AttributeState::Available(ref val)) => { - let pairs = cel_value_to_header_pairs(val); - if pairs.is_empty() { - return TaskOutcome::Done; - } - HeaderOperation::Set(pairs.into()) - } - Err(e) => { - error!("Failed to evaluate headers expression: {e}"); - return TaskOutcome::Failed; - } + let operation = match self.headers_expr.eval(ctx, &mut cel_ctx) { + Ok(AttributeState::Pending) => { + error!("Unexpected pending state in headers expression"); + return TaskOutcome::Failed; + } + Ok(AttributeState::Available(ref val)) => { + let pairs = cel_value_to_header_pairs(val); + if pairs.is_empty() { + return TaskOutcome::Done; } + HeaderOperation::Set(pairs.into()) + } + Err(e) => { + error!("Failed to evaluate headers expression: {e}"); + return TaskOutcome::Failed; } }; @@ -188,6 +148,9 @@ impl Task for ModifyHeadersTask { #[cfg(test)] mod tests { use super::*; + use crate::data::attribute::Path; + use crate::data::cel::Predicate; + use crate::data::Expression; use crate::kuadrant::MockWasmHost; use std::sync::Arc; @@ -199,12 +162,15 @@ mod tests { let backend = Arc::new(mock_host); let mut ctx = ReqRespCtx::new(backend); - let new_headers: Headers = vec![("New-Key".to_string(), "New-Value".to_string())].into(); + let predicate = Predicate::new("true").unwrap(); + let headers_expr = Expression::new("[['New-Key', 'New-Value']]").unwrap(); let task = Box::new(ModifyHeadersTask::new( "0".to_string(), - HeaderOperation::Append(new_headers), + predicate, + headers_expr, HeadersType::HttpRequestHeaders, + false, )); let outcome = task.apply(&mut ctx); @@ -232,13 +198,15 @@ mod tests { let backend = Arc::new(mock_host); let mut ctx = ReqRespCtx::new(backend); - let new_headers: Headers = - vec![("Content-Type".to_string(), "application/json".to_string())].into(); + let predicate = Predicate::new("true").unwrap(); + let headers_expr = Expression::new("[['Content-Type', 'application/json']]").unwrap(); let task = Box::new(ModifyHeadersTask::new( "0".to_string(), - HeaderOperation::Set(new_headers), + predicate, + headers_expr, HeadersType::HttpRequestHeaders, + false, )); let outcome = task.apply(&mut ctx); @@ -256,7 +224,7 @@ mod tests { } #[test] - fn remove_headers_task() { + fn empty_headers_expr_returns_done() { let existing_headers = vec![ ("API-Key-To-Remove".to_string(), "API-Value".to_string()), ("X-Origin".to_string(), "Kuadrant".to_string()), @@ -266,24 +234,18 @@ mod tests { let backend = Arc::new(mock_host); let mut ctx = ReqRespCtx::new(backend); - let keys_to_remove = vec!["API-Key-To-Remove".to_string()]; + let predicate = Predicate::new("true").unwrap(); + let headers_expr = Expression::new("[]").unwrap(); let task = Box::new(ModifyHeadersTask::new( "0".to_string(), - HeaderOperation::Remove(keys_to_remove), + predicate, + headers_expr, HeadersType::HttpResponseHeaders, + false, )); let outcome = task.apply(&mut ctx); assert!(matches!(outcome, TaskOutcome::Done)); - - let result: Result>, _> = - ctx.get_attribute_ref(&Path::from(&HeadersType::HttpResponseHeaders)); - - assert!(matches!(result, Ok(AttributeState::Available(Some(_))))); - if let Ok(AttributeState::Available(Some(headers))) = result { - assert_eq!(headers.len(), 1); - assert_eq!(headers.get("X-Origin"), Some("Kuadrant")); - } } } diff --git a/src/kuadrant/pipeline/tasks/mod.rs b/src/kuadrant/pipeline/tasks/mod.rs index 5135688b..4a734712 100644 --- a/src/kuadrant/pipeline/tasks/mod.rs +++ b/src/kuadrant/pipeline/tasks/mod.rs @@ -12,7 +12,7 @@ pub use dynamic::DynamicTask; pub use export_traces::ExportTracesTask; pub use fail::FailTask; pub use failure_mode::FailureModeTask; -pub use headers::{HeaderOperation, HeadersType, ModifyHeadersTask}; +pub use headers::{HeadersType, ModifyHeadersTask}; pub use send_reply::SendReplyTask; pub use store::StoreTask; pub use token_usage::TokenUsageTask; diff --git a/src/kuadrant/pipeline/tasks/send_reply.rs b/src/kuadrant/pipeline/tasks/send_reply.rs index 11209c25..bc6f0558 100644 --- a/src/kuadrant/pipeline/tasks/send_reply.rs +++ b/src/kuadrant/pipeline/tasks/send_reply.rs @@ -12,38 +12,13 @@ use crate::services::{cel_value_to_header_pairs, deny_response_struct_def}; pub struct SendReplyTask { task_id: String, - predicate: Option, + predicate: Predicate, deny_with: Expression, terminal: bool, } impl SendReplyTask { pub fn new( - task_id: String, - status_code: u32, - headers: Vec<(String, String)>, - body: Option, - ) -> Self { - let headers = headers - .into_iter() - .map(|(h, v)| format!("['''{h}''', '''{v}''']")) - .collect::>() - .join(", "); - let body_field = body.map(|b| format!("body: '''{b}'''")).unwrap_or_default(); - let expr = format!( - "DenyResponse {{ status: {status_code}u, headers: [{headers}], {body_field} }}" - ); - #[allow(clippy::expect_used)] - let deny_with = Expression::new(&expr).expect("Needs to be valid CEL!"); - Self { - task_id, - predicate: None, - deny_with, - terminal: false, - } - } - - pub fn new_deferred( task_id: String, predicate: Predicate, deny_with: Expression, @@ -51,49 +26,26 @@ impl SendReplyTask { ) -> Self { Self { task_id, - predicate: Some(predicate), + predicate, deny_with, terminal, } } pub fn default() -> Self { - Self::new( - "default".to_string(), - 500, - Vec::new(), - Some("Internal Server Error.\n".to_string()), + #[allow(clippy::expect_used)] + let deny_with = Expression::new( + r#"DenyResponse { status: 500u, headers: [], body: 'Internal Server Error.\n' }"#, ) - } -} - -impl TryFrom for SendReplyTask { - type Error = String; - - fn try_from(value: Value) -> Result { - let Value::Struct(deny_response) = value else { - return Err(format!("expected DenyResponse struct, got: {value:?}")); - }; - - let status = deny_response - .field_value("status") - .and_then(|v| v.downcast_ref::()) - .map(|v| *v.inner() as u32) - .ok_or("DenyResponse missing or invalid 'status' field")?; - - let body = deny_response - .field_value("body") - .and_then(|v| v.downcast_ref::()) - .map(|v| v.inner().to_string()) - .filter(|s| !s.is_empty()); - - let headers = deny_response - .field_value("headers") - .and_then(|v| Value::try_from(v).ok()) - .map(|v| cel_value_to_header_pairs(&v)) - .unwrap_or_default(); - - Ok(Self::new("from_value".to_string(), status, headers, body)) + .expect("Needs to be valid CEL!"); + #[allow(clippy::expect_used)] + let predicate = Predicate::new("true").expect("Needs to be valid!"); + Self { + task_id: "default".to_string(), + predicate, + deny_with, + terminal: false, + } } } @@ -108,23 +60,20 @@ impl Task for SendReplyTask { #[tracing::instrument(name = "send_reply", skip(self, ctx))] fn apply(self: Box, ctx: &mut ReqRespCtx) -> TaskOutcome { - if let Some(ref predicate) = self.predicate { - let mut cel_ctx = ctx.cel.new_ctx(&*self); - match predicate.test(ctx, &mut cel_ctx) { - Ok(AttributeState::Available(true)) => {} - Ok(AttributeState::Available(false)) => return TaskOutcome::Done, - Ok(AttributeState::Pending) => { - return TaskOutcome::Requeued(vec![self]); - } - Err(e) => { - error!("Failed to evaluate predicate: {e:?}"); - return TaskOutcome::Failed; - } + let mut cel_ctx = ctx.cel.new_ctx(&*self); + match self.predicate.test(ctx, &mut cel_ctx) { + Ok(AttributeState::Available(true)) => {} + Ok(AttributeState::Available(false)) => return TaskOutcome::Done, + Ok(AttributeState::Pending) => { + return TaskOutcome::Requeued(vec![self]); + } + Err(e) => { + error!("Failed to evaluate predicate: {e:?}"); + return TaskOutcome::Failed; } } let (status_code, headers, body) = { - let mut cel_ctx = ctx.cel.new_ctx(&*self); match self.deny_with.eval(ctx, &mut cel_ctx) { Ok(AttributeState::Pending) => { error!("Unexpected pending state in deny expression"); @@ -204,17 +153,15 @@ mod tests { let mock_host = MockWasmHost::new(); let mut ctx = ReqRespCtx::new(Arc::new(mock_host)); + let predicate = Predicate::new("true").unwrap(); + let deny_with = Expression::new( + "DenyResponse { status: 403u, headers: [['content-type', 'text/plain'], ['WWW-Authenticate', 'APIKEY realm=\"api-key-users\"']], body: 'Access Denied' }" + ).unwrap(); let task = Box::new(SendReplyTask::new( "0".to_string(), - 403, - vec![ - ("content-type".to_string(), "text/plain".to_string()), - ( - "WWW-Authenticate".to_string(), - "APIKEY realm=\"api-key-users\"".to_string(), - ), - ], - Some("Access Denied".to_string()), + predicate, + deny_with, + false, )); let outcome = task.apply(&mut ctx); @@ -226,7 +173,15 @@ mod tests { let mock_host = MockWasmHost::new(); let mut ctx = ReqRespCtx::new(Arc::new(mock_host)); - let task = Box::new(SendReplyTask::new("0".to_string(), 429, vec![], None)); + let predicate = Predicate::new("true").unwrap(); + let deny_with = + Expression::new("DenyResponse { status: 429u, headers: [], body: '' }").unwrap(); + let task = Box::new(SendReplyTask::new( + "0".to_string(), + predicate, + deny_with, + false, + )); let outcome = task.apply(&mut ctx); assert!(matches!(outcome, TaskOutcome::Done));