Skip to content

Unified CEL context and on_reply become Tasks#377

Draft
adam-cattermole wants to merge 12 commits into
task-dependenciesfrom
cel-context
Draft

Unified CEL context and on_reply become Tasks#377
adam-cattermole wants to merge 12 commits into
task-dependenciesfrom
cel-context

Conversation

@adam-cattermole

@adam-cattermole adam-cattermole commented Jun 9, 2026

Copy link
Copy Markdown
Member

Summary by CodeRabbit

  • Refactor
    • Improved internal architecture for expression language evaluation and task execution.
    • Enhanced context management for more efficient predicate processing across the pipeline.
    • Streamlined task identifier handling and scope management throughout the execution framework.
    • Optimised task initialisation and execution efficiency.

Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
Signed-off-by: Adam Cattermole <a.d.cattermole@gmail.com>
@coderabbitai

coderabbitai Bot commented Jun 9, 2026

Copy link
Copy Markdown

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0a7a01a9-c5cf-4a73-8090-3f631cfa1c92

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • ✅ Review completed - (🔄 Check again to review again)
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cel-context

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@adam-cattermole adam-cattermole self-assigned this Jun 9, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/kuadrant/pipeline/tasks/send_reply.rs (1)

77-80: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Requeue pending deny_with expressions.

Expression::eval() legitimately returns Pending for body- and host-backed attributes. Failing on Line 78 makes valid CEL-driven replies impossible whenever the response payload depends on data that arrives later.

Suggested fix
             match self.deny_with.eval(ctx, &mut cel_ctx) {
                 Ok(AttributeState::Pending) => {
-                    error!("Unexpected pending state in deny expression");
-                    return TaskOutcome::Failed;
+                    return TaskOutcome::Requeued(vec![self]);
                 }

Based on learnings: Pending in these tasks is not limited to stream-body data, and Expression::eval() returns Pending for unresolved attributes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/pipeline/tasks/send_reply.rs` around lines 77 - 80, The code
currently treats AttributeState::Pending from self.deny_with.eval(...) as an
error and returns TaskOutcome::Failed; instead, detect AttributeState::Pending
in the match and return TaskOutcome::Requeue (or the pipeline's retry outcome)
so CEL-driven replies that depend on later-resolved attributes are retried;
update the match arm for AttributeState::Pending in send_reply.rs (the block
handling self.deny_with.eval and any similar eval calls) to stop logging it as
an error and return the requeue outcome so the task is retried when attributes
become available.

Source: Learnings

🧹 Nitpick comments (2)
src/services/dynamic/converters.rs (1)

66-68: ⚡ Quick win

Update the rustdoc to match the new API contract.

collect_struct_defs no longer registers anything into an Env; it returns Vec<StructDef> for the caller to add. The current comment now describes the old side effect and is misleading on a newly public helper.

Suggested doc update
-    /// Register a message descriptor and all its nested message types with the CEL environment
-    /// This must be called before evaluating CEL expressions that construct these messages
+    /// Collect struct definitions for a message descriptor and all nested message types.
+    /// Callers must add the returned defs to the CEL environment before evaluating
+    /// CEL expressions that construct these messages.
     pub fn collect_struct_defs(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/services/dynamic/converters.rs` around lines 66 - 68, Update the rustdoc
for the public function collect_struct_defs to reflect the new API contract:
remove claims that it registers descriptors into an Env and instead state that
it returns a Vec<StructDef> containing the message descriptor and its nested
message types for the caller to add to their CEL environment; mention it must be
called before evaluating CEL expressions that construct these messages and that
the caller is responsible for registering the returned StructDef values into the
Env.
src/kuadrant/pipeline/tasks/send_reply.rs (1)

151-188: 🏗️ Heavy lift

Please cover the actual reply hostcall here.

These tests only assert TaskOutcome::Done, so they do not prove that the computed status, headers, or body were sent correctly. The repository guideline for Rust tests is to use proxy-wasm-test-framework for Envoy hostcall mocking and header verification.

As per coding guidelines: use proxy-wasm-test-framework for mocking Envoy hostcalls in tests, including mock service responses and header verification.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/pipeline/tasks/send_reply.rs` around lines 151 - 188, Update the
two tests (test_send_reply_task_success and test_send_reply_task_no_body) to
assert the actual Envoy hostcall behavior instead of only TaskOutcome::Done:
replace or augment MockWasmHost/ReqRespCtx usage with proxy-wasm-test-framework
mocks that capture and verify the outgoing response hostcall invoked by
SendReplyTask::apply, asserting the expected status, headers and body for the
deny_with expressions; ensure you verify the specific hostcall name (the send
reply/send_http_response hostcall used by SendReplyTask) and its parameters
rather than only checking TaskOutcome.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/data/cel.rs`:
- Around line 586-588: The code is reusing a single mutable CEL Context across
multiple predicate evaluations which allows earlier evaluations to leave
bindings (e.g., queryMap) that affect later predicates; change the loop so each
predicate gets a fresh Context::default() (i.e., create a new Context inside the
for loop before calling predicate.test) instead of creating one outside the
loop, ensuring predicate.test(req_ctx, &mut cel_ctx) is invoked with a newly
created context for each predicate.

In `@src/kuadrant/context.rs`:
- Around line 540-552: Currently you insert task_id into self.registered even
when Arc::get_mut(&mut self.env) fails; change the logic so task_id is only
inserted when CEL type registration actually succeeds. Specifically, for the
block that checks !self.registered.contains(task_id) and then calls
task.cel_types(), if types is empty insert immediately; otherwise attempt
Arc::get_mut(&mut self.env) and only after successfully iterating and calling
env.add_struct(...) for each type_def insert
self.registered.insert(task_id.to_string()); do not insert when Arc::get_mut
returns None (log and bail out so future new_ctx() calls will retry
registration).

In `@src/kuadrant/pipeline/blueprint.rs`:
- Around line 95-101: The code currently calls
Predicate::new("true").expect(...) and Expression::new(&format!("[['{var}',
'{}']]", ctx.request_id())).expect(...) inside to_core_task(), which will panic
on malformed var or request_id; change to return a Result from to_core_task()
(or propagate an existing Result) and replace the expect() calls with fallible
parsing using the ? operator so parse errors propagate instead of panicking, or
avoid reparsing by building the header expression/value programmatically (e.g.,
construct the header pair/struct directly rather than formatting CEL source) and
use the existing ctx.set_public_tracker_id(var.clone()) path; update function
signature and call sites to handle the Result accordingly and reference
Predicate::new, Expression::new, ctx.set_public_tracker_id, and to_core_task()
when making the change.

In `@src/kuadrant/pipeline/tasks/fail.rs`:
- Around line 45-49: The current match arm for AttributeState::Pending
prematurely returns TaskOutcome::Failed when
ctx.response_body.is_end_of_stream() is true; instead, change the logic in the
Pending arm so it requeues the task (TaskOutcome::Requeued(vec![self])) by
default and only return TaskOutcome::Failed when the task can prove it is
specifically waiting on the response body stream (e.g., add or check a boolean
like self.is_stream_bound() or a method like self.waits_on_body_stream());
update the match arm around AttributeState::Pending (the code referencing
ctx.response_body.is_end_of_stream(), TaskOutcome::Failed, and
TaskOutcome::Requeued(vec![self])) to requeue unless that explicit stream-bound
condition is true.

In `@src/kuadrant/pipeline/tasks/headers.rs`:
- Around line 78-81: The match arm for self.headers_expr.eval currently treats
AttributeState::Pending as an error and returns TaskOutcome::Failed; instead,
handle AttributeState::Pending the same way the predicate path does by returning
TaskOutcome::Requeue (remove or downgrade the error log), so unresolved/deferred
attributes can be retried later; update the match in headers handling (the call
to headers_expr.eval and the AttributeState::Pending arm) to return
TaskOutcome::Requeue rather than TaskOutcome::Failed.

---

Outside diff comments:
In `@src/kuadrant/pipeline/tasks/send_reply.rs`:
- Around line 77-80: The code currently treats AttributeState::Pending from
self.deny_with.eval(...) as an error and returns TaskOutcome::Failed; instead,
detect AttributeState::Pending in the match and return TaskOutcome::Requeue (or
the pipeline's retry outcome) so CEL-driven replies that depend on
later-resolved attributes are retried; update the match arm for
AttributeState::Pending in send_reply.rs (the block handling self.deny_with.eval
and any similar eval calls) to stop logging it as an error and return the
requeue outcome so the task is retried when attributes become available.

---

Nitpick comments:
In `@src/kuadrant/pipeline/tasks/send_reply.rs`:
- Around line 151-188: Update the two tests (test_send_reply_task_success and
test_send_reply_task_no_body) to assert the actual Envoy hostcall behavior
instead of only TaskOutcome::Done: replace or augment MockWasmHost/ReqRespCtx
usage with proxy-wasm-test-framework mocks that capture and verify the outgoing
response hostcall invoked by SendReplyTask::apply, asserting the expected
status, headers and body for the deny_with expressions; ensure you verify the
specific hostcall name (the send reply/send_http_response hostcall used by
SendReplyTask) and its parameters rather than only checking TaskOutcome.

In `@src/services/dynamic/converters.rs`:
- Around line 66-68: Update the rustdoc for the public function
collect_struct_defs to reflect the new API contract: remove claims that it
registers descriptors into an Env and instead state that it returns a
Vec<StructDef> containing the message descriptor and its nested message types
for the caller to add to their CEL environment; mention it must be called before
evaluating CEL expressions that construct these messages and that the caller is
responsible for registering the returned StructDef values into the Env.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: de04ca7f-2b60-4b84-9630-11f130d4690a

📥 Commits

Reviewing files that changed from the base of the PR and between fd9455e and 49318ae.

📒 Files selected for processing (18)
  • src/data/cel.rs
  • src/kuadrant/context.rs
  • src/kuadrant/pipeline/blueprint.rs
  • src/kuadrant/pipeline/executor.rs
  • src/kuadrant/pipeline/mod.rs
  • src/kuadrant/pipeline/tasks/dynamic.rs
  • src/kuadrant/pipeline/tasks/fail.rs
  • src/kuadrant/pipeline/tasks/failure_mode.rs
  • src/kuadrant/pipeline/tasks/headers.rs
  • src/kuadrant/pipeline/tasks/mod.rs
  • src/kuadrant/pipeline/tasks/send_reply.rs
  • src/kuadrant/pipeline/tasks/store.rs
  • src/kuadrant/pipeline/tasks/token_usage.rs
  • src/kuadrant/pipeline/tasks/tracing_decorator.rs
  • src/services/dynamic.rs
  • src/services/dynamic/converters.rs
  • src/services/mod.rs
  • tests/response_body.rs

Comment thread src/data/cel.rs
Comment on lines +586 to +588
let mut cel_ctx = Context::default();
for predicate in self.iter() {
match predicate.test(req_ctx)? {
match predicate.test(req_ctx, &mut cel_ctx)? {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not reuse one mutable CEL Context across different predicates.

Expression::eval() mutates the passed context by adding functions and variable bindings, and nothing removes them afterwards. Reusing a single Context here makes predicate evaluation order-dependent: an earlier predicate can leave queryMap or other bindings behind for a later predicate that should have been evaluated in a clean context.

Suggested fix
-        let mut cel_ctx = Context::default();
         for predicate in self.iter() {
-            match predicate.test(req_ctx, &mut cel_ctx)? {
+            let mut cel_ctx = Context::default();
+            match predicate.test(req_ctx, &mut cel_ctx)? {
                 AttributeState::Pending => {
                     return Ok(AttributeState::Pending);
                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut cel_ctx = Context::default();
for predicate in self.iter() {
match predicate.test(req_ctx)? {
match predicate.test(req_ctx, &mut cel_ctx)? {
for predicate in self.iter() {
let mut cel_ctx = Context::default();
match predicate.test(req_ctx, &mut cel_ctx)? {
AttributeState::Pending => {
return Ok(AttributeState::Pending);
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/data/cel.rs` around lines 586 - 588, The code is reusing a single mutable
CEL Context across multiple predicate evaluations which allows earlier
evaluations to leave bindings (e.g., queryMap) that affect later predicates;
change the loop so each predicate gets a fresh Context::default() (i.e., create
a new Context inside the for loop before calling predicate.test) instead of
creating one outside the loop, ensuring predicate.test(req_ctx, &mut cel_ctx) is
invoked with a newly created context for each predicate.

Comment thread src/kuadrant/context.rs
Comment on lines +540 to +552
if !self.registered.contains(task_id) {
let types = task.cel_types();
if !types.is_empty() {
match Arc::get_mut(&mut self.env) {
Some(env) => {
for type_def in types {
env.add_struct(type_def);
}
}
None => error!("Failed to add CEL types: Arc refcount > 1"),
}
}
self.registered.insert(task_id.to_string());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not mark a task as registered when CEL type registration failed.

If Arc::get_mut(&mut self.env) returns None, this path logs the error but still inserts task_id into registered. That turns a transient failure into a permanent one: every later new_ctx() call for the same task id will skip cel_types() registration, so tasks that depend on those structs will keep evaluating against an incomplete CEL environment.

Suggested fix
         if !self.registered.contains(task_id) {
             let types = task.cel_types();
+            let mut registration_succeeded = true;
             if !types.is_empty() {
                 match Arc::get_mut(&mut self.env) {
                     Some(env) => {
                         for type_def in types {
                             env.add_struct(type_def);
                         }
                     }
-                    None => error!("Failed to add CEL types: Arc refcount > 1"),
+                    None => {
+                        error!("Failed to add CEL types: Arc refcount > 1");
+                        registration_succeeded = false;
+                    }
                 }
             }
-            self.registered.insert(task_id.to_string());
+            if registration_succeeded {
+                self.registered.insert(task_id.to_string());
+            }
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/context.rs` around lines 540 - 552, Currently you insert task_id
into self.registered even when Arc::get_mut(&mut self.env) fails; change the
logic so task_id is only inserted when CEL type registration actually succeeds.
Specifically, for the block that checks !self.registered.contains(task_id) and
then calls task.cel_types(), if types is empty insert immediately; otherwise
attempt Arc::get_mut(&mut self.env) and only after successfully iterating and
calling env.add_struct(...) for each type_def insert
self.registered.insert(task_id.to_string()); do not insert when Arc::get_mut
returns None (log and bail out so future new_ctx() calls will retry
registration).

Comment on lines +95 to +101
ctx.set_public_tracker_id(var.clone());
#[allow(clippy::expect_used)]
let predicate = Predicate::new("true").expect("Needs to be valid!");
#[allow(clippy::expect_used)]
let headers_expr =
Expression::new(&format!("[['{var}', '{}']]", ctx.request_id()))
.expect("Needs to be valid CEL!");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Avoid expect() on generated CEL in production.

This branch builds CEL source with var and ctx.request_id() and then panics if parsing fails. A quote or escape in either value turns this into a request-path crash rather than a recoverable configuration/runtime error. Please construct the header value without reparsing source, or propagate an error out of to_core_task().

As per coding guidelines: always use proper error handling with Result types and the ? operator instead of panic!, .unwrap(), or .expect() calls.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/pipeline/blueprint.rs` around lines 95 - 101, The code currently
calls Predicate::new("true").expect(...) and
Expression::new(&format!("[['{var}', '{}']]", ctx.request_id())).expect(...)
inside to_core_task(), which will panic on malformed var or request_id; change
to return a Result from to_core_task() (or propagate an existing Result) and
replace the expect() calls with fallible parsing using the ? operator so parse
errors propagate instead of panicking, or avoid reparsing by building the header
expression/value programmatically (e.g., construct the header pair/struct
directly rather than formatting CEL source) and use the existing
ctx.set_public_tracker_id(var.clone()) path; update function signature and call
sites to handle the Result accordingly and reference Predicate::new,
Expression::new, ctx.set_public_tracker_id, and to_core_task() when making the
change.

Source: Coding guidelines

Comment on lines +45 to +49
Ok(AttributeState::Pending) => {
if ctx.response_body.is_end_of_stream() {
TaskOutcome::Failed
} else {
TaskOutcome::Requeued(vec![self])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not fail generic pending predicates at response EOS.

Predicate::test() can stay pending for non-stream inputs as well. Once Line 46 sees response EOS, this turns a still-valid fail action into TaskOutcome::Failed, and the executor will only log it instead of retrying. Requeue here unless this task can prove it is specifically body-stream bound.

Suggested fix
             Ok(AttributeState::Pending) => {
-                if ctx.response_body.is_end_of_stream() {
-                    TaskOutcome::Failed
-                } else {
-                    TaskOutcome::Requeued(vec![self])
-                }
+                TaskOutcome::Requeued(vec![self])
             }

Based on learnings: only apply the is_end_of_stream() guard pattern when Pending specifically means the task is waiting for stream-body data.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Ok(AttributeState::Pending) => {
if ctx.response_body.is_end_of_stream() {
TaskOutcome::Failed
} else {
TaskOutcome::Requeued(vec![self])
Ok(AttributeState::Pending) => {
TaskOutcome::Requeued(vec![self])
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/pipeline/tasks/fail.rs` around lines 45 - 49, The current match
arm for AttributeState::Pending prematurely returns TaskOutcome::Failed when
ctx.response_body.is_end_of_stream() is true; instead, change the logic in the
Pending arm so it requeues the task (TaskOutcome::Requeued(vec![self])) by
default and only return TaskOutcome::Failed when the task can prove it is
specifically waiting on the response body stream (e.g., add or check a boolean
like self.is_stream_bound() or a method like self.waits_on_body_stream());
update the match arm around AttributeState::Pending (the code referencing
ctx.response_body.is_end_of_stream(), TaskOutcome::Failed, and
TaskOutcome::Requeued(vec![self])) to requeue unless that explicit stream-bound
condition is true.

Source: Learnings

Comment on lines +78 to +81
let operation = match self.headers_expr.eval(ctx, &mut cel_ctx) {
Ok(AttributeState::Pending) => {
error!("Unexpected pending state in headers expression");
return TaskOutcome::Failed;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

headers_expr can be pending too.

Expression::eval() is allowed to return Pending here. Treating that as an error breaks header mutations that depend on later body chunks or other deferred attributes; this should requeue like the predicate path does.

Suggested fix
         let operation = match self.headers_expr.eval(ctx, &mut cel_ctx) {
             Ok(AttributeState::Pending) => {
-                error!("Unexpected pending state in headers expression");
-                return TaskOutcome::Failed;
+                return TaskOutcome::Requeued(vec![self]);
             }

Based on learnings: Pending in these tasks is not limited to stream-body data, and Expression::eval() returns Pending for unresolved attributes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/kuadrant/pipeline/tasks/headers.rs` around lines 78 - 81, The match arm
for self.headers_expr.eval currently treats AttributeState::Pending as an error
and returns TaskOutcome::Failed; instead, handle AttributeState::Pending the
same way the predicate path does by returning TaskOutcome::Requeue (remove or
downgrade the error log), so unresolved/deferred attributes can be retried
later; update the match in headers handling (the call to headers_expr.eval and
the AttributeState::Pending arm) to return TaskOutcome::Requeue rather than
TaskOutcome::Failed.

Source: Learnings

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant