From 23ddb3cbb1d2790d680a0ba3e91d6bbe9ff8f9b3 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 21 May 2026 14:29:49 -0400 Subject: [PATCH 01/15] increase timeout for one additional archive cycle --- src/test/java/io/cryostat/rules/RulesArchiverTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index b5df707ed..0600fd5a2 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -130,7 +130,9 @@ public void test() 50, TimeUnit.SECONDS); - webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(50)); + // Wait 60s to allow for archive job to run after 50s rule deletion + // Archive jobs run every 10s, so we need buffer time for the job after deletion + webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(60)); webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(65)); given().log() From 577987ac7f431ebc68a748e17f5eed71c5f2bb8e Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Thu, 21 May 2026 15:36:46 -0400 Subject: [PATCH 02/15] reorder --- .../io/cryostat/rules/RulesArchiverTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index 0600fd5a2..a7ddc8bdf 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -107,7 +107,12 @@ public void test() .then() .statusCode(201); - // stop further background jobs before checking results + // Wait for at least one deletion to occur before stopping the rule + // Archive jobs run every 10s with preservedArchives=3, so deletion happens when 3 exist + // This should occur by the 4th archive job (~30s), but allow extra time for delays + webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(45)); + + // Now stop further background jobs before checking results worker.schedule( () -> { given().log() @@ -125,15 +130,11 @@ public void test() .statusCode(204) .body(Matchers.emptyOrNullString()); }, - // this is enough time for 4-5 copies to be made, but we expect the oldest to get - // rolled over so 3 should remain - 50, + // Schedule deletion shortly after we've confirmed at least one deletion occurred + 5, TimeUnit.SECONDS); - // Wait 60s to allow for archive job to run after 50s rule deletion - // Archive jobs run every 10s, so we need buffer time for the job after deletion - webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(60)); - webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(65)); + webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(10)); given().log() .all() From 707a0abb4c2c5204c0e42f351740618ad348f81e Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 09:49:17 -0400 Subject: [PATCH 03/15] more robust checks --- .../io/cryostat/rules/RulesArchiverTest.java | 58 ++++++++++--------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index a7ddc8bdf..32b9fb7f0 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -107,34 +107,36 @@ public void test() .then() .statusCode(201); - // Wait for at least one deletion to occur before stopping the rule - // Archive jobs run every 10s with preservedArchives=3, so deletion happens when 3 exist - // This should occur by the 4th archive job (~30s), but allow extra time for delays - webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(45)); - - // Now stop further background jobs before checking results - worker.schedule( - () -> { - given().log() - .all() - // do not clean, or else Cryostat will archive the recording on stop and - // create an additional copy - .queryParam("clean", false) - .pathParam("ruleName", RULE_NAME) - .delete("/api/v4/rules/{ruleName}") - .then() - .log() - .all() - .and() - .assertThat() - .statusCode(204) - .body(Matchers.emptyOrNullString()); - }, - // Schedule deletion shortly after we've confirmed at least one deletion occurred - 5, - TimeUnit.SECONDS); - - webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(10)); + // Wait for archives to be created. With preservedArchives=3 and archivalPeriodSeconds=10: + // - 1st archive at ~0s + // - 2nd archive at ~10s + // - 3rd archive at ~20s + // - 4th archive at ~30s (triggers deletion of 1st, then creates 4th) + // Wait for 3 archives to be created to ensure we're at the point where deletion will occur + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + + // Now wait for the deletion that should occur with the 4th archive + webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(20)); + + // Stop further background jobs before checking results + given().log() + .all() + // do not clean, or else Cryostat will archive the recording on stop and + // create an additional copy + .queryParam("clean", false) + .pathParam("ruleName", RULE_NAME) + .delete("/api/v4/rules/{ruleName}") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(204) + .body(Matchers.emptyOrNullString()); + + webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(5)); given().log() .all() From 9f737c25196579c1a6c98df51bb9237bda8a1f11 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 09:51:19 -0400 Subject: [PATCH 04/15] fixup! more robust checks --- src/test/java/io/cryostat/rules/RulesArchiverTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index 32b9fb7f0..eaccfa4e7 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import io.cryostat.AbstractTransactionalTestBase; From 5a69b86e046dcf42dced0c7aea1a38e458d0607e Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 09:55:35 -0400 Subject: [PATCH 05/15] fixup! more robust checks --- src/test/java/io/cryostat/rules/RulesArchiverTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index eaccfa4e7..ca2f6a3d1 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -116,9 +116,12 @@ public void test() webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); - // Now wait for the deletion that should occur with the 4th archive + // Now wait for the deletion that should occur with the 4th archive job webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(20)); + // Wait for the 4th archive to be created (happens after deletion in same job) + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(5)); + // Stop further background jobs before checking results given().log() .all() From 4bd4bcbe17b447e2944d6362ea37f0c303d4ad3b Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 10:53:48 -0400 Subject: [PATCH 06/15] increase delays --- .../io/cryostat/rules/RulesArchiverTest.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index ca2f6a3d1..a2bd5e42d 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -107,17 +107,20 @@ public void test() .statusCode(201); // Wait for archives to be created. With preservedArchives=3 and archivalPeriodSeconds=10: - // - 1st archive at ~0s - // - 2nd archive at ~10s - // - 3rd archive at ~20s - // - 4th archive at ~30s (triggers deletion of 1st, then creates 4th) - // Wait for 3 archives to be created to ensure we're at the point where deletion will occur - webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + // Note: initialDelaySeconds=0 is converted to archivalPeriodSeconds (10s) in RuleExecutor + // - 1st archive at ~10s (initial delay) + // - 2nd archive at ~20s + // - 3rd archive at ~30s + // - 4th archive at ~40s (triggers deletion of 1st, then creates 4th) + + // Wait for first 3 archives to be created (allow time for rule activation + recording + // start) + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(20)); webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); - // Now wait for the deletion that should occur with the 4th archive job - webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(20)); + // Wait for deletion that occurs with 4th archive job + webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(15)); // Wait for the 4th archive to be created (happens after deletion in same job) webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(5)); From 2b4c7a1b6e579725ce85606c7240d8e3559d3b91 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 12:54:22 -0400 Subject: [PATCH 07/15] wait for recording to start first, then wait for archives to come in --- .../io/cryostat/rules/RulesArchiverTest.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index a2bd5e42d..7fe5e06b5 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -106,16 +106,18 @@ public void test() .then() .statusCode(201); + // Wait for the rule to activate and start the recording + webSocketClient.expectNotification("ActiveRecordingCreated", Duration.ofSeconds(10)); + // Wait for archives to be created. With preservedArchives=3 and archivalPeriodSeconds=10: // Note: initialDelaySeconds=0 is converted to archivalPeriodSeconds (10s) in RuleExecutor - // - 1st archive at ~10s (initial delay) - // - 2nd archive at ~20s - // - 3rd archive at ~30s - // - 4th archive at ~40s (triggers deletion of 1st, then creates 4th) - - // Wait for first 3 archives to be created (allow time for rule activation + recording - // start) - webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(20)); + // - 1st archive at ~10s after recording starts (initial delay) + // - 2nd archive at ~20s after recording starts + // - 3rd archive at ~30s after recording starts + // - 4th archive at ~40s after recording starts (triggers deletion of 1st, then creates 4th) + + // Wait for first 3 archives to be created + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); From cb12a1d85a6161cb6a5426adeef00909c53c5e55 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Fri, 22 May 2026 16:25:36 -0400 Subject: [PATCH 08/15] expect rule creation notification --- src/test/java/io/cryostat/rules/RulesArchiverTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index 7fe5e06b5..86fb417a5 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -106,8 +106,11 @@ public void test() .then() .statusCode(201); + // Wait for rule creation to be processed + webSocketClient.expectNotification("RuleCreated", Duration.ofSeconds(5)); + // Wait for the rule to activate and start the recording - webSocketClient.expectNotification("ActiveRecordingCreated", Duration.ofSeconds(10)); + webSocketClient.expectNotification("ActiveRecordingCreated", Duration.ofSeconds(15)); // Wait for archives to be created. With preservedArchives=3 and archivalPeriodSeconds=10: // Note: initialDelaySeconds=0 is converted to archivalPeriodSeconds (10s) in RuleExecutor From 07f8525124536092d67cdab02a07792c9ac6214a Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 07:25:08 -0400 Subject: [PATCH 09/15] fix(rules): improve error handling --- .../java/io/cryostat/rules/RuleExecutor.java | 54 +++++++++++-------- .../java/io/cryostat/rules/RuleService.java | 2 +- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index cd8659d98..095f4e42d 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -88,24 +88,25 @@ Uni onMessage(ActivationAttempt attempt) { logger.tracev( "Attempting to activate rule \"{0}\" for target {1} - attempt #{2}", attempt.ruleId(), attempt.targetId(), attempt.attempts()); - try { - var targetOpt = Target.find("id", attempt.targetId()).firstResultOptional(); - if (targetOpt.isEmpty()) { - logger.warnv( - "Target {0} no longer exists, skipping rule activation attempt", - attempt.targetId()); - return Uni.createFrom().nullItem(); - } - Target target = targetOpt.get(); - var ruleOpt = Rule.find("id", attempt.ruleId()).firstResultOptional(); - if (ruleOpt.isEmpty()) { - logger.warnv( - "Rule {0} no longer exists, skipping activation attempt", attempt.ruleId()); - return Uni.createFrom().nullItem(); - } - Rule rule = ruleOpt.get(); + var targetOpt = Target.find("id", attempt.targetId()).firstResultOptional(); + if (targetOpt.isEmpty()) { + logger.warnv( + "Target {0} no longer exists, skipping rule activation attempt", + attempt.targetId()); + return Uni.createFrom().nullItem(); + } + Target target = targetOpt.get(); + var ruleOpt = Rule.find("id", attempt.ruleId()).firstResultOptional(); + if (ruleOpt.isEmpty()) { + logger.warnv( + "Rule {0} no longer exists, skipping activation attempt", attempt.ruleId()); + return Uni.createFrom().nullItem(); + } + Rule rule = ruleOpt.get(); + + try { Pair pair = recordingHelper.parseEventSpecifier(rule.eventSpecifier); Template template = @@ -130,20 +131,31 @@ Uni onMessage(ActivationAttempt attempt) { createRecordingOptions(rule), labels) .await() - .indefinitely(); + .atMost(java.time.Duration.ofSeconds(30)); } catch (EntityExistsException eee) { - // ignore - the recording already existed and was running, so we don't want to - // replace it - but we should continue on to reschedule the periodic archival job + logger.debugv( + "Recording \"{0}\" already exists on target {1}, fetching existing" + + " recording", + rule.getRecordingName(), target.id); + var existingRecording = + recordingHelper.getActiveRecording( + target, r -> Objects.equals(r.name, rule.getRecordingName())); + if (existingRecording.isPresent()) { + recording = existingRecording.get(); + } else { + logger.warnv( + "Recording \"{0}\" should exist on target {1} but was not found", + rule.getRecordingName(), target.id); + } } if (recording != null && rule.isArchiver()) { scheduleArchival(rule, target, recording); } + return Uni.createFrom().nullItem(); } catch (Exception e) { logger.error("Rule execution failed", e); return Uni.createFrom().failure(e); } - - return Uni.createFrom().nullItem(); } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index dcc21853b..924a1b7ff 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -205,7 +205,7 @@ private void fireAttemptExecution(ActivationAttempt fAttempt) { int count = fAttempt.attempts.incrementAndGet(); int delay = (int) Math.pow(2, count); TimeUnit unit = TimeUnit.SECONDS; - int limit = 5; + int limit = 12; if (count < limit) { logger.debugv( "Rule \"{0}\" activation attempt" From 61f779973f617bd111f93678bd492adee8514422 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 09:45:53 -0400 Subject: [PATCH 10/15] separate transaction boundaries --- .../java/io/cryostat/rules/RuleExecutor.java | 106 +++++++++--------- 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index 095f4e42d..0d76b1df6 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -38,6 +38,7 @@ import io.cryostat.targets.Target.TargetDiscovery; import io.cryostat.util.EntityExistsException; +import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.Uni; @@ -83,7 +84,6 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { } @ConsumeEvent(blocking = true) - @Transactional Uni onMessage(ActivationAttempt attempt) { logger.tracev( "Attempting to activate rule \"{0}\" for target {1} - attempt #{2}", @@ -106,56 +106,7 @@ Uni onMessage(ActivationAttempt attempt) { } Rule rule = ruleOpt.get(); - try { - Pair pair = - recordingHelper.parseEventSpecifier(rule.eventSpecifier); - Template template = - recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); - - var priorRecording = - recordingHelper.getActiveRecording( - target, r -> Objects.equals(r.name, rule.getRecordingName())); - if (priorRecording.isPresent()) { - recordingHelper.stopRecording(priorRecording.get()).await().indefinitely(); - } - var labels = new HashMap<>(rule.metadata.labels()); - labels.put(RULE_LABEL_KEY, rule.name); - ActiveRecording recording = null; - try { - recording = - recordingHelper - .startRecording( - target, - RecordingReplace.STOPPED, - template, - createRecordingOptions(rule), - labels) - .await() - .atMost(java.time.Duration.ofSeconds(30)); - } catch (EntityExistsException eee) { - logger.debugv( - "Recording \"{0}\" already exists on target {1}, fetching existing" - + " recording", - rule.getRecordingName(), target.id); - var existingRecording = - recordingHelper.getActiveRecording( - target, r -> Objects.equals(r.name, rule.getRecordingName())); - if (existingRecording.isPresent()) { - recording = existingRecording.get(); - } else { - logger.warnv( - "Recording \"{0}\" should exist on target {1} but was not found", - rule.getRecordingName(), target.id); - } - } - if (recording != null && rule.isArchiver()) { - scheduleArchival(rule, target, recording); - } - return Uni.createFrom().nullItem(); - } catch (Exception e) { - logger.error("Rule execution failed", e); - return Uni.createFrom().failure(e); - } + return QuarkusTransaction.joiningExisting().call(() -> activate(target, rule)); } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) @@ -213,6 +164,59 @@ public void handleRuleRecordingCleanup(Rule rule) { logger.debugv("Cancelled scheduled tasks for rule \"{0}\"", rule.name); } + private Uni activate(Target target, Rule rule) { + try { + Pair pair = + recordingHelper.parseEventSpecifier(rule.eventSpecifier); + Template template = + recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); + + var priorRecording = + recordingHelper.getActiveRecording( + target, r -> Objects.equals(r.name, rule.getRecordingName())); + if (priorRecording.isPresent()) { + recordingHelper.stopRecording(priorRecording.get()).await().indefinitely(); + } + var labels = new HashMap<>(rule.metadata.labels()); + labels.put(RULE_LABEL_KEY, rule.name); + ActiveRecording recording = null; + try { + recording = + recordingHelper + .startRecording( + target, + RecordingReplace.STOPPED, + template, + createRecordingOptions(rule), + labels) + .await() + .atMost(java.time.Duration.ofSeconds(30)); + } catch (EntityExistsException eee) { + logger.debugv( + "Recording \"{0}\" already exists on target {1}, fetching existing" + + " recording", + rule.getRecordingName(), target.id); + var existingRecording = + recordingHelper.getActiveRecording( + target, r -> Objects.equals(r.name, rule.getRecordingName())); + if (existingRecording.isPresent()) { + recording = existingRecording.get(); + } else { + logger.warnv( + "Recording \"{0}\" should exist on target {1} but was not found", + rule.getRecordingName(), target.id); + } + } + if (recording != null && rule.isArchiver()) { + scheduleArchival(rule, target, recording); + } + return Uni.createFrom().nullItem(); + } catch (Exception e) { + logger.error("Rule execution failed", e); + return Uni.createFrom().failure(e); + } + } + private void cancelTasksForRule(Rule rule) { if (rule.isArchiver()) { List targets = From f3e6b49efb01f09462add358b748b177f47121d2 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 11:37:36 -0400 Subject: [PATCH 11/15] clear queues between tests --- src/main/java/io/cryostat/rules/RuleService.java | 11 +++++++++++ .../io/cryostat/AbstractTransactionalTestBase.java | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 924a1b7ff..ea2d6dbc1 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -130,8 +130,19 @@ void onStart(@Observes StartupEvent ev) { void onStop(@Observes ShutdownEvent evt) throws SchedulerException { activator.shutdown(); cleaner.shutdown(); + clearQueues(); + } + + /** + * Clear activation and cleanup queues. This is primarily useful for testing to ensure clean + * state between test runs. + */ + public void clearQueues() { activations.clear(); cleanups.clear(); + logger.debugv( + "Cleared activation and cleanup queues (activations: {0}, cleanups: {1})", + activations.size(), cleanups.size()); } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) diff --git a/src/test/java/io/cryostat/AbstractTransactionalTestBase.java b/src/test/java/io/cryostat/AbstractTransactionalTestBase.java index cf49682b3..87a115089 100644 --- a/src/test/java/io/cryostat/AbstractTransactionalTestBase.java +++ b/src/test/java/io/cryostat/AbstractTransactionalTestBase.java @@ -15,6 +15,8 @@ */ package io.cryostat; +import io.cryostat.rules.RuleService; + import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import org.flywaydb.core.Flyway; @@ -25,6 +27,7 @@ public abstract class AbstractTransactionalTestBase extends AbstractTestBase { @Inject Flyway flyway; @Inject EntityManager entityManager; + @Inject RuleService ruleService; @BeforeEach void migrateFlyway() throws SchedulerException { @@ -33,6 +36,7 @@ void migrateFlyway() throws SchedulerException { flyway.migrate(); flyway.validate(); entityManager.clear(); + ruleService.clearQueues(); restartScheduler(); } } From 2596533de04b197e62df1ed05c51a216c345bbc1 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 11:47:51 -0400 Subject: [PATCH 12/15] fix: orphaned recording detection --- .../java/io/cryostat/recordings/RecordingHelper.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index 7f8861bc9..f6e93f5e2 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -293,11 +293,19 @@ private List listActiveRecordingsImpl(Target target) { List descriptors = connectionManager.executeConnectedTask( target, conn -> conn.getService().getAvailableRecordings()); + + var remoteIds = + new HashSet<>(descriptors.stream().map(IRecordingDescriptor::getId).toList()); + boolean updated = false; var it = target.activeRecordings.iterator(); while (it.hasNext()) { var r = it.next(); - if (!previousIds.contains(r.remoteId)) { + if (!remoteIds.contains(r.remoteId)) { + logger.warnv( + "Orphaned recording detected: id={0} remoteId={1} name={2} on target" + + " {3}, removing from database", + r.id, r.remoteId, r.name, target.connectUrl); r.delete(); it.remove(); updated |= true; From 545054bf43ece9cad8a19a2cae2c4ce61b717adc Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 11:49:21 -0400 Subject: [PATCH 13/15] improve remote recording vs ActiveRecording entity error handling and state tracking --- .../cryostat/recordings/RecordingHelper.java | 212 ++++++++++-------- .../java/io/cryostat/rules/RuleExecutor.java | 12 +- 2 files changed, 135 insertions(+), 89 deletions(-) diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index f6e93f5e2..3e756e758 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -477,101 +477,137 @@ private ActiveRecording startRecordingImpl( } getActiveRecording(lockedTarget, r -> r.name.equals(recordingName)) .ifPresent(r -> this.deleteRecording(r).await().indefinitely()); - var desc = - connectionManager.executeConnectedTask( - lockedTarget, - conn -> { - RecordingOptionsBuilder optionsBuilder = - recordingOptionsBuilderFactory - .create(lockedTarget) - .name(recordingName); - if (options.duration().isPresent()) { - optionsBuilder = - optionsBuilder.duration( - TimeUnit.SECONDS.toMillis( - options.duration().get())); - } - if (options.toDisk().isPresent()) { - optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); - } - if (options.maxAge().isPresent()) { - optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); - } - if (options.maxSize().isPresent()) { - optionsBuilder = optionsBuilder.maxSize(options.maxSize().get()); - } - IConstrainedMap recordingOptions = optionsBuilder.build(); - - switch (template.getType()) { - case PRESET: - return conn.getService() - .start( - recordingOptions, - presetTemplateService - .getXml( - template.getName(), - TemplateType.CUSTOM) - .orElseThrow()); - case CUSTOM: - return conn.getService() - .start( - recordingOptions, - customTemplateService - .getXml( - template.getName(), - TemplateType.CUSTOM) - .orElseThrow()); - case TARGET: - return conn.getService().start(recordingOptions, template); - default: - throw new IllegalStateException( - "Unknown template type: " + template.getType()); - } - }); + + IRecordingDescriptor desc; + try { + desc = + connectionManager.executeConnectedTask( + lockedTarget, + conn -> { + RecordingOptionsBuilder optionsBuilder = + recordingOptionsBuilderFactory + .create(lockedTarget) + .name(recordingName); + if (options.duration().isPresent()) { + optionsBuilder = + optionsBuilder.duration( + TimeUnit.SECONDS.toMillis( + options.duration().get())); + } + if (options.toDisk().isPresent()) { + optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); + } + if (options.maxAge().isPresent()) { + optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); + } + if (options.maxSize().isPresent()) { + optionsBuilder = + optionsBuilder.maxSize(options.maxSize().get()); + } + IConstrainedMap recordingOptions = optionsBuilder.build(); + + switch (template.getType()) { + case PRESET: + return conn.getService() + .start( + recordingOptions, + presetTemplateService + .getXml( + template.getName(), + TemplateType.CUSTOM) + .orElseThrow()); + case CUSTOM: + return conn.getService() + .start( + recordingOptions, + customTemplateService + .getXml( + template.getName(), + TemplateType.CUSTOM) + .orElseThrow()); + case TARGET: + return conn.getService().start(recordingOptions, template); + default: + throw new IllegalStateException( + "Unknown template type: " + template.getType()); + } + }); + } catch (Exception e) { + logger.errorv( + e, + "Failed to start remote recording \"{0}\" on target {1}", + recordingName, + lockedTarget.connectUrl); + throw e; + } Map labels = new HashMap<>(rawLabels); labels.put("template.name", template.getName()); labels.put("template.type", template.getType().toString()); Metadata meta = new Metadata(labels); - ActiveRecording recording = ActiveRecording.from(lockedTarget, desc, meta, options); - - Optional existingOpt = - ActiveRecording.find( - "target.id = ?1 and remoteId = ?2", - lockedTarget.id, - recording.remoteId) - .firstResultOptional(); - - if (existingOpt.isPresent()) { - ActiveRecording existingRecording = existingOpt.get(); - logger.infov( - "Found existing recording id={0} remoteId={1} name={2}," - + " merging state from startRecording", - existingRecording.id, existingRecording.remoteId, existingRecording.name); - - existingRecording.name = recording.name; - existingRecording.state = recording.state; - existingRecording.duration = recording.duration; - existingRecording.startTime = recording.startTime; - existingRecording.archiveOnStop = recording.archiveOnStop; - existingRecording.continuous = recording.continuous; - existingRecording.toDisk = recording.toDisk; - existingRecording.maxSize = recording.maxSize; - existingRecording.maxAge = recording.maxAge; - existingRecording.external = recording.external; - - var mergedLabels = new HashMap<>(existingRecording.metadata.labels()); - mergedLabels.putAll(recording.metadata.labels()); - existingRecording.metadata = new Metadata(mergedLabels); - - existingRecording.persist(); - recording = existingRecording; - } else { - recording.persist(); - } + ActiveRecording recording; + try { + recording = ActiveRecording.from(lockedTarget, desc, meta, options); + + Optional existingOpt = + ActiveRecording.find( + "target.id = ?1 and remoteId = ?2", + lockedTarget.id, + recording.remoteId) + .firstResultOptional(); + + if (existingOpt.isPresent()) { + ActiveRecording existingRecording = existingOpt.get(); + logger.infov( + "Found existing recording id={0} remoteId={1} name={2}," + + " merging state from startRecording", + existingRecording.id, existingRecording.remoteId, existingRecording.name); + + existingRecording.name = recording.name; + existingRecording.state = recording.state; + existingRecording.duration = recording.duration; + existingRecording.startTime = recording.startTime; + existingRecording.archiveOnStop = recording.archiveOnStop; + existingRecording.continuous = recording.continuous; + existingRecording.toDisk = recording.toDisk; + existingRecording.maxSize = recording.maxSize; + existingRecording.maxAge = recording.maxAge; + existingRecording.external = recording.external; + + var mergedLabels = new HashMap<>(existingRecording.metadata.labels()); + mergedLabels.putAll(recording.metadata.labels()); + existingRecording.metadata = new Metadata(mergedLabels); + + existingRecording.persist(); + recording = existingRecording; + } else { + recording.persist(); + } - lockedTarget.activeRecordings.add(recording); + lockedTarget.activeRecordings.add(recording); + } catch (Exception e) { + logger.errorv( + e, + "Failed to persist recording \"{0}\" to database, attempting to stop remote" + + " recording", + recordingName); + try { + connectionManager.executeConnectedTask( + lockedTarget, + conn -> { + conn.getService().close(desc); + return null; + }); + } catch (Exception cleanupEx) { + logger.warnv( + cleanupEx, + "Failed to cleanup remote recording \"{0}\" after database persist" + + " failure", + recordingName); + } + throw e; + } if (!recording.continuous) { JobKey key = diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index 0d76b1df6..35bd996dd 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -209,10 +209,20 @@ private Uni activate(Target target, Rule rule) { } if (recording != null && rule.isArchiver()) { scheduleArchival(rule, target, recording); + } else if (recording == null) { + logger.errorv( + "Failed to activate rule \"{0}\" on target {1}: recording is null", + rule.name, target.connectUrl); + throw new IllegalStateException( + "Recording activation failed but no exception was thrown"); } return Uni.createFrom().nullItem(); } catch (Exception e) { - logger.error("Rule execution failed", e); + logger.errorv( + e, + "Rule \"{0}\" activation failed on target {1}", + rule.name, + target.connectUrl); return Uni.createFrom().failure(e); } } From cfb90a8c6c8f49f1853ad1e691299aac2211f333 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Mon, 25 May 2026 16:39:51 -0400 Subject: [PATCH 14/15] replace in-memory activation/cleanup queues with Quartz job and fault tolerant methods --- src/main/docker/include/cryostat.jfc | 16 + .../cryostat/rules/RecordingCleanupJob.java | 65 ++ .../io/cryostat/rules/RuleActivationJob.java | 62 ++ .../java/io/cryostat/rules/RuleExecutor.java | 30 +- .../java/io/cryostat/rules/RuleService.java | 656 ++++++++++-------- .../AbstractTransactionalTestBase.java | 4 - .../io/cryostat/rules/RulesArchiverTest.java | 5 - 7 files changed, 515 insertions(+), 323 deletions(-) create mode 100644 src/main/java/io/cryostat/rules/RecordingCleanupJob.java create mode 100644 src/main/java/io/cryostat/rules/RuleActivationJob.java diff --git a/src/main/docker/include/cryostat.jfc b/src/main/docker/include/cryostat.jfc index ba58198f2..7e0dfbd57 100644 --- a/src/main/docker/include/cryostat.jfc +++ b/src/main/docker/include/cryostat.jfc @@ -35,6 +35,22 @@ 0 ms + + + true + + + 0 ms + + + + + true + + + 0 ms + + true diff --git a/src/main/java/io/cryostat/rules/RecordingCleanupJob.java b/src/main/java/io/cryostat/rules/RecordingCleanupJob.java new file mode 100644 index 000000000..798fc8005 --- /dev/null +++ b/src/main/java/io/cryostat/rules/RecordingCleanupJob.java @@ -0,0 +1,65 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.rules; + +import jakarta.inject.Inject; +import org.jboss.logging.Logger; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +/** + * Quartz job that delegates to RuleService for fault-tolerant recording cleanup. + * + *

This job is scheduled when a recording needs to be cleaned up (stopped). It delegates to + * RuleService.cleanupRecording() which handles all retry logic, timeouts, and backpressure via + * SmallRye Fault Tolerance annotations. + * + *

The job is one-time only and unscheduled after execution (success or failure). + */ +@DisallowConcurrentExecution +public class RecordingCleanupJob implements Job { + + @Inject Logger logger; + @Inject RuleService ruleService; + + @Override + public void execute(JobExecutionContext ctx) throws JobExecutionException { + String ruleName = ctx.getMergedJobDataMap().getString("ruleName"); + String jvmId = ctx.getMergedJobDataMap().getString("jvmId"); + String recordingName = ctx.getMergedJobDataMap().getString("recordingName"); + + logger.debugv( + "Executing cleanup job: rule={0} jvmId={1} recording={2}", + ruleName, jvmId, recordingName); + + try { + ruleService.cleanupRecording(ruleName, jvmId, recordingName).await().indefinitely(); + logger.debugv("Recording cleanup completed: rule={0} jvmId={1}", ruleName, jvmId); + ctx.getScheduler().unscheduleJob(ctx.getTrigger().getKey()); + } catch (Exception e) { + logger.errorv( + e, + "Recording cleanup failed after all retries: rule={0} jvmId={1}", + ruleName, + jvmId); + JobExecutionException ex = new JobExecutionException(e); + ex.setUnscheduleFiringTrigger(true); + throw ex; + } + } +} diff --git a/src/main/java/io/cryostat/rules/RuleActivationJob.java b/src/main/java/io/cryostat/rules/RuleActivationJob.java new file mode 100644 index 000000000..90d580448 --- /dev/null +++ b/src/main/java/io/cryostat/rules/RuleActivationJob.java @@ -0,0 +1,62 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.rules; + +import jakarta.inject.Inject; +import org.jboss.logging.Logger; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +/** + * Quartz job that delegates to RuleService for fault-tolerant rule activation. + * + *

This job is scheduled when a rule needs to be activated on a target. It delegates to + * RuleService.activateRule() which handles all retry logic, timeouts, and backpressure via SmallRye + * Fault Tolerance annotations. + * + *

The job is one-time only and unscheduled after execution (success or failure). + */ +@DisallowConcurrentExecution +public class RuleActivationJob implements Job { + + @Inject Logger logger; + @Inject RuleService ruleService; + + @Override + public void execute(JobExecutionContext ctx) throws JobExecutionException { + String ruleName = ctx.getMergedJobDataMap().getString("ruleName"); + String jvmId = ctx.getMergedJobDataMap().getString("jvmId"); + + logger.debugv("Executing rule activation job: rule={0} jvmId={1}", ruleName, jvmId); + + try { + ruleService.activateRule(ruleName, jvmId).await().indefinitely(); + logger.debugv("Rule activation completed: rule={0} jvmId={1}", ruleName, jvmId); + ctx.getScheduler().unscheduleJob(ctx.getTrigger().getKey()); + } catch (Exception e) { + logger.errorv( + e, + "Rule activation failed after all retries: rule={0} jvmId={1}", + ruleName, + jvmId); + JobExecutionException ex = new JobExecutionException(e); + ex.setUnscheduleFiringTrigger(true); + throw ex; + } + } +} diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index 35bd996dd..7d7f7e033 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -33,12 +33,10 @@ import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.rules.Rule.RuleEvent; -import io.cryostat.rules.RuleService.ActivationAttempt; import io.cryostat.targets.Target; import io.cryostat.targets.Target.TargetDiscovery; import io.cryostat.util.EntityExistsException; -import io.quarkus.narayana.jta.QuarkusTransaction; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.mutiny.Uni; @@ -83,32 +81,6 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { quartz.shutdown(); } - @ConsumeEvent(blocking = true) - Uni onMessage(ActivationAttempt attempt) { - logger.tracev( - "Attempting to activate rule \"{0}\" for target {1} - attempt #{2}", - attempt.ruleId(), attempt.targetId(), attempt.attempts()); - - var targetOpt = Target.find("id", attempt.targetId()).firstResultOptional(); - if (targetOpt.isEmpty()) { - logger.warnv( - "Target {0} no longer exists, skipping rule activation attempt", - attempt.targetId()); - return Uni.createFrom().nullItem(); - } - Target target = targetOpt.get(); - - var ruleOpt = Rule.find("id", attempt.ruleId()).firstResultOptional(); - if (ruleOpt.isEmpty()) { - logger.warnv( - "Rule {0} no longer exists, skipping activation attempt", attempt.ruleId()); - return Uni.createFrom().nullItem(); - } - Rule rule = ruleOpt.get(); - - return QuarkusTransaction.joiningExisting().call(() -> activate(target, rule)); - } - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { @@ -164,7 +136,7 @@ public void handleRuleRecordingCleanup(Rule rule) { logger.debugv("Cancelled scheduled tasks for rule \"{0}\"", rule.name); } - private Uni activate(Target target, Rule rule) { + public Uni activate(Target target, Rule rule) { try { Pair pair = recordingHelper.parseEventSpecifier(rule.eventSpecifier); diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index ea2d6dbc1..779e19690 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -16,21 +16,13 @@ package io.cryostat.rules; import java.time.Duration; -import java.util.Comparator; -import java.util.Iterator; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; +import java.util.Set; import io.cryostat.ConfigProperties; import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.recordings.ActiveRecording; import io.cryostat.recordings.RecordingHelper; import io.cryostat.rules.Rule.RuleEvent; import io.cryostat.targets.Target; @@ -41,19 +33,32 @@ import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; -import io.smallrye.mutiny.infrastructure.Infrastructure; -import io.vertx.ext.web.handler.HttpException; -import io.vertx.mutiny.core.eventbus.EventBus; +import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.persistence.NoResultException; import jakarta.transaction.Transactional; import jakarta.ws.rs.NotFoundException; +import jdk.jfr.Description; +import jdk.jfr.Event; +import jdk.jfr.Label; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.faulttolerance.Asynchronous; +import org.eclipse.microprofile.faulttolerance.Bulkhead; +import org.eclipse.microprofile.faulttolerance.CircuitBreaker; +import org.eclipse.microprofile.faulttolerance.Retry; +import org.eclipse.microprofile.faulttolerance.Timeout; import org.jboss.logging.Logger; import org.projectnessie.cel.tools.ScriptException; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.matchers.GroupMatcher; /** * Handle {@link io.cryostat.target.Target} instances appearing and disappearing, and {@link @@ -71,97 +76,47 @@ public class RuleService { @Inject Logger logger; @Inject MatchExpressionEvaluator evaluator; @Inject RecordingHelper recordingHelper; - @Inject EventBus bus; + @Inject RuleExecutor ruleExecutor; + @Inject Scheduler quartz; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionFailedTimeout; - private final BlockingQueue activations = - new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); - private final BlockingQueue cleanups = - new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); - private final ExecutorService activator = Executors.newSingleThreadExecutor(); - private final ExecutorService cleaner = Executors.newSingleThreadExecutor(); - private final ExecutorService workers = Executors.newVirtualThreadPerTaskExecutor(); - void onStart(@Observes StartupEvent ev) { logger.trace("RuleService started"); - activator.submit( - () -> { - for (Rule rule : enabledRules()) { - try { - QuarkusTransaction.requiringNew() - .run(() -> applyRuleToMatchingTargets(rule)); - } catch (Exception e) { - logger.error(e); - } - } - }); - activator.submit( - () -> { - while (!activator.isShutdown()) { - ActivationAttempt attempt = null; - try { - attempt = activations.take(); - } catch (InterruptedException ie) { - logger.trace(ie); - break; - } - final ActivationAttempt fAttempt = attempt; - workers.submit(() -> fireAttemptExecution(fAttempt)); - } - }); - cleaner.submit( - () -> { - while (!cleaner.isShutdown()) { - CleanupAttempt attempt = null; - try { - attempt = cleanups.take(); - } catch (InterruptedException ie) { - logger.trace(ie); - break; - } - final CleanupAttempt fAttempt = attempt; - workers.submit(() -> fireCleanupExecution(fAttempt)); - } - }); - } - - void onStop(@Observes ShutdownEvent evt) throws SchedulerException { - activator.shutdown(); - cleaner.shutdown(); - clearQueues(); + // Apply all enabled rules to matching targets + for (Rule rule : enabledRules()) { + try { + QuarkusTransaction.requiringNew().run(() -> applyRuleToMatchingTargets(rule)); + } catch (Exception e) { + logger.error(e); + } + } } - /** - * Clear activation and cleanup queues. This is primarily useful for testing to ensure clean - * state between test runs. - */ - public void clearQueues() { - activations.clear(); - cleanups.clear(); - logger.debugv( - "Cleared activation and cleanup queues (activations: {0}, cleanups: {1})", - activations.size(), cleanups.size()); + void onStop(@Observes ShutdownEvent evt) { + try { + quartz.shutdown(); + } catch (SchedulerException e) { + logger.error("Failed to shutdown Quartz scheduler", e); + } } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) @Transactional void onMessage(TargetDiscovery event) { if (event.serviceRef().id == null) { - // target is not persisted yet, skip and wait for an update after it is return; } switch (event.kind()) { case MODIFIED: - // fall-through case FOUND: if (event.serviceRef().isConnectable()) { applyRulesToTarget(event.serviceRef()); } break; case LOST: - resetActivations(event.serviceRef()); + cancelJobsForTarget(event.serviceRef()); break; default: break; @@ -193,188 +148,343 @@ public void handleRuleModification(RuleEvent event) { public void handleRuleRecordingCleanup(Rule rule) { var targets = evaluator.getMatchedTargets(rule.matchExpression); for (var target : targets) { - CleanupAttempt attempt = new CleanupAttempt(rule, target); - cleanups.add(attempt); - logger.debugv( - "Queued cleanup for recording \"{0}\" on target {1}", - rule.getRecordingName(), target.id); + scheduleCleanupJob(rule, target); } } - private void fireAttemptExecution(ActivationAttempt fAttempt) { + @Retry( + maxRetries = 12, + delay = 1, + delayUnit = ChronoUnit.SECONDS, + maxDuration = 5, + durationUnit = ChronoUnit.MINUTES, + jitter = 200, + retryOn = {Exception.class}, + abortOn = { + NoResultException.class, + NotFoundException.class, + PermanentFailureException.class + }) + @Bulkhead(value = 10, waitingTaskQueue = 255) + @CircuitBreaker( + requestVolumeThreshold = 10, + failureRatio = 0.5, + delay = 30, + delayUnit = ChronoUnit.SECONDS, + successThreshold = 3) + @Timeout(value = 30, unit = ChronoUnit.SECONDS) + @Asynchronous + public Uni activateRule(String ruleName, String jvmId) { + RuleActivationEvent event = new RuleActivationEvent(); + event.begin(); + event.ruleName = ruleName; + event.jvmId = jvmId; + try { - logger.tracev( - "Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}", - fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts); - bus.request(RuleExecutor.class.getName(), fAttempt) - .await() - .atMost(connectionFailedTimeout); - logger.tracev( - "Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId); + return QuarkusTransaction.joiningExisting() + .call( + () -> { + var targetOpt = Target.getTargetByJvmId(jvmId); + if (targetOpt.isEmpty()) { + logger.warnv( + "Target with jvmId {0} no longer exists, aborting rule" + + " activation", + jvmId); + if (event.shouldCommit()) { + event.success = false; + event.permanentFailure = true; + event.errorMessage = "Target not found"; + event.commit(); + } + throw new PermanentFailureException( + "Target with jvmId " + jvmId + " not found"); + } + Target target = targetOpt.get(); + + var ruleOpt = + Rule.find("name", ruleName).firstResultOptional(); + if (ruleOpt.isEmpty()) { + logger.warnv( + "Rule {0} no longer exists, aborting activation", + ruleName); + if (event.shouldCommit()) { + event.success = false; + event.permanentFailure = true; + event.errorMessage = "Rule not found"; + event.commit(); + } + throw new PermanentFailureException( + "Rule " + ruleName + " not found"); + } + Rule rule = ruleOpt.get(); + + return ruleExecutor + .activate(target, rule) + .onItem() + .invoke( + () -> { + if (event.shouldCommit()) { + event.success = true; + event.commit(); + } + }) + .onFailure() + .invoke( + t -> { + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = t.getMessage(); + event.commit(); + } + }); + }); + } catch (PermanentFailureException e) { + return Uni.createFrom().failure(e); } catch (Exception e) { - if (fAttempt != null) { - int count = fAttempt.attempts.incrementAndGet(); - int delay = (int) Math.pow(2, count); - TimeUnit unit = TimeUnit.SECONDS; - int limit = 12; - if (count < limit) { - logger.debugv( - "Rule \"{0}\" activation attempt" - + " #{1} for target {2} failed," - + " rescheduling in {3}{4} ...", - fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit); - Infrastructure.getDefaultWorkerPool() - .schedule(() -> activations.add(fAttempt), delay, unit); - } else { - logger.errorv( - "Rule \"{0}\" activation attempt" - + " #{1} failed for target {2}" - + " - limit ({3}) reached! Will" - + " not retry...", - fAttempt.ruleId, count, fAttempt.targetId, limit); - } + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = e.getMessage(); + event.commit(); } - logger.error(e); + return Uni.createFrom().failure(e); } } - private void fireCleanupExecution(CleanupAttempt fAttempt) { - try { - logger.tracev( - "Attempting to cleanup recording \"{0}\" on target {1} - attempt #{2}", - fAttempt.recordingName, fAttempt.targetId, fAttempt.attempts.get()); + @Retry( + maxRetries = 5, + delay = 1, + delayUnit = ChronoUnit.SECONDS, + maxDuration = 2, + durationUnit = ChronoUnit.MINUTES, + jitter = 200, + retryOn = {Exception.class}, + abortOn = { + NoResultException.class, + NotFoundException.class, + PermanentFailureException.class + }) + @Bulkhead(value = 10, waitingTaskQueue = 255) + @CircuitBreaker( + requestVolumeThreshold = 10, + failureRatio = 0.5, + delay = 30, + delayUnit = ChronoUnit.SECONDS) + @Timeout(value = 30, unit = ChronoUnit.SECONDS) + @Asynchronous + public Uni cleanupRecording(String ruleName, String jvmId, String recordingName) { + RecordingCleanupEvent event = new RecordingCleanupEvent(); + event.begin(); + event.ruleName = ruleName; + event.jvmId = jvmId; + event.recordingName = recordingName; - QuarkusTransaction.requiringNew() - .run( + try { + return QuarkusTransaction.joiningExisting() + .call( () -> { - var targetOpt = - Target.find("id", fAttempt.targetId) - .firstResultOptional(); + var targetOpt = Target.getTargetByJvmId(jvmId); if (targetOpt.isEmpty()) { logger.infov( - "Target {0} no longer exists, cleanup complete", - fAttempt.targetId); - return; + "Target with jvmId {0} no longer exists, cleanup" + + " complete", + jvmId); + if (event.shouldCommit()) { + event.success = true; + event.skipped = true; + event.commit(); + } + throw new PermanentFailureException( + "Target not found - cleanup complete"); } Target target = targetOpt.get(); var recordingOpt = recordingHelper.getActiveRecording( - target, - r -> - Objects.equals( - r.name, fAttempt.recordingName)); + target, r -> Objects.equals(r.name, recordingName)); + if (recordingOpt.isEmpty()) { logger.infov( "Recording \"{0}\" no longer exists on target {1}," + " cleanup complete", - fAttempt.recordingName, fAttempt.targetId); - return; + recordingName, jvmId); + if (event.shouldCommit()) { + event.success = true; + event.skipped = true; + event.commit(); + } + throw new PermanentFailureException( + "Recording not found - cleanup complete"); } - var recording = recordingOpt.get(); - try { - recordingHelper.stopRecording(recording).await().indefinitely(); - logger.infov( - "Successfully stopped recording \"{0}\" on target {1}", - fAttempt.recordingName, fAttempt.targetId); - } catch (Exception stopEx) { - throw new RuntimeException(stopEx); - } + return recordingHelper + .stopRecording(recordingOpt.get()) + .onItem() + .invoke( + () -> { + if (event.shouldCommit()) { + event.success = true; + event.commit(); + } + }) + .onFailure() + .invoke( + t -> { + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = t.getMessage(); + event.commit(); + } + }) + .replaceWithVoid(); }); + } catch (PermanentFailureException e) { + return Uni.createFrom().failure(e); } catch (Exception e) { - if (fAttempt != null) { - int count = fAttempt.attempts.incrementAndGet(); - int delay = (int) Math.pow(2, count); - TimeUnit unit = TimeUnit.SECONDS; - int limit = 5; - - // Check if this is a permanent failure (target/recording gone) - boolean isPermanentFailure = isPermanentCleanupFailure(e); - - if (isPermanentFailure) { - logger.infov( - "Recording \"{0}\" cleanup on target {1} failed permanently: {2}." - + " Will not retry.", - fAttempt.recordingName, fAttempt.targetId, e.getMessage()); - } else if (count < limit) { - logger.debugv( - "Recording \"{0}\" cleanup attempt #{1} on target {2} failed," - + " rescheduling in {3}{4} ...", - fAttempt.recordingName, count - 1, fAttempt.targetId, delay, unit); - Infrastructure.getDefaultWorkerPool() - .schedule(() -> cleanups.add(fAttempt), delay, unit); - } else { - logger.errorv( - "Recording \"{0}\" cleanup attempt #{1} failed on target {2}" - + " - limit ({3}) reached! Will not retry...", - fAttempt.recordingName, count, fAttempt.targetId, limit); - } - } - if (!isPermanentCleanupFailure(e)) { - logger.warn(e); + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = e.getMessage(); + event.commit(); } + return Uni.createFrom().failure(e); } } - private boolean isPermanentCleanupFailure(Throwable t) { - if (t == null) { - return false; - } - // Target deleted or recording already stopped/deleted - if (t instanceof NoResultException) { - return true; - } - if (t instanceof NotFoundException) { - return true; - } - if (t instanceof HttpException httpEx) { - // 404 = recording not found, 400 = bad request (recording already stopped) - return httpEx.getStatusCode() == 404 || httpEx.getStatusCode() == 400; + void applyRulesToTarget(Target target) { + if (target.jvmId == null) { + logger.warnv("Target {0} has no jvmId, cannot apply rules", target.connectUrl); + return; } - // Check cause recursively - return isPermanentCleanupFailure(t.getCause()); - } - private void resetActivations(Rule rule) { - resetActivations(a -> a.ruleId == rule.id); + for (var rule : enabledRules()) { + try { + if (evaluator.applies(rule.matchExpression, target)) { + scheduleActivationJob(rule, target); + } + } catch (ScriptException se) { + logger.error(se); + } + } } - private void resetActivations(Target target) { - resetActivations(a -> a.targetId == target.id); - resetCleanups(c -> c.targetId == target.id); + void applyRuleToMatchingTargets(Rule rule) { + var targets = evaluator.getMatchedTargets(rule.matchExpression); + for (var target : targets) { + if (target.jvmId != null) { + scheduleActivationJob(rule, target); + } else { + logger.warnv( + "Target {0} has no jvmId, cannot apply rule {1}", + target.connectUrl, rule.name); + } + } } - private void resetActivations(Predicate p) { - Iterator it = activations.iterator(); - while (it.hasNext()) { - ActivationAttempt attempt = it.next(); - if (p.test(attempt)) { - it.remove(); + private void scheduleActivationJob(Rule rule, Target target) { + try { + JobDetail job = + JobBuilder.newJob(RuleActivationJob.class) + .withIdentity( + "activation-" + rule.name + "-" + target.jvmId, + "rule-activations") + .usingJobData("ruleName", rule.name) + .usingJobData("jvmId", target.jvmId) + .build(); + + if (quartz.checkExists(job.getKey())) { + logger.debugv( + "Activation job already exists: rule={0} jvmId={1}", + rule.name, target.jvmId); + return; } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(job.getKey().getName(), job.getKey().getGroup()) + .startNow() + .build(); + + quartz.scheduleJob(job, trigger); + logger.debugv("Scheduled activation job: rule={0} jvmId={1}", rule.name, target.jvmId); + + } catch (SchedulerException e) { + logger.errorv( + e, + "Failed to schedule activation job: rule={0} jvmId={1}", + rule.name, + target.jvmId); } } - private void resetCleanups(Predicate p) { - Iterator it = cleanups.iterator(); - while (it.hasNext()) { - CleanupAttempt attempt = it.next(); - if (p.test(attempt)) { - it.remove(); + private void scheduleCleanupJob(Rule rule, Target target) { + if (target.jvmId == null) { + logger.warnv("Target {0} has no jvmId, cannot schedule cleanup", target.connectUrl); + return; + } + + try { + JobDetail job = + JobBuilder.newJob(RecordingCleanupJob.class) + .withIdentity( + "cleanup-" + rule.name + "-" + target.jvmId, "rule-cleanups") + .usingJobData("ruleName", rule.name) + .usingJobData("jvmId", target.jvmId) + .usingJobData("recordingName", rule.getRecordingName()) + .build(); + + if (quartz.checkExists(job.getKey())) { + logger.debugv( + "Cleanup job already exists: rule={0} jvmId={1}", rule.name, target.jvmId); + return; } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(job.getKey().getName(), job.getKey().getGroup()) + .startNow() + .build(); + + quartz.scheduleJob(job, trigger); + logger.debugv("Scheduled cleanup job: rule={0} jvmId={1}", rule.name, target.jvmId); + + } catch (SchedulerException e) { + logger.errorv( + e, + "Failed to schedule cleanup job: rule={0} jvmId={1}", + rule.name, + target.jvmId); } } - void applyRulesToTarget(Target target) { - resetActivations(target); - for (var rule : enabledRules()) { - try { - if (evaluator.applies(rule.matchExpression, target)) { - activations.add(new ActivationAttempt(rule, target)); + private void cancelJobsForTarget(Target target) { + if (target.jvmId == null) { + return; + } + + try { + Set activationJobs = + quartz.getJobKeys(GroupMatcher.jobGroupEquals("rule-activations")); + Set cleanupJobs = + quartz.getJobKeys(GroupMatcher.jobGroupEquals("rule-cleanups")); + + for (JobKey key : activationJobs) { + if (key.getName().endsWith("-" + target.jvmId)) { + quartz.deleteJob(key); + logger.debugv( + "Cancelled activation job {0} for lost target {1}", + key.getName(), target.jvmId); + } + } + + for (JobKey key : cleanupJobs) { + if (key.getName().endsWith("-" + target.jvmId)) { + quartz.deleteJob(key); + logger.debugv( + "Cancelled cleanup job {0} for lost target {1}", + key.getName(), target.jvmId); } - } catch (ScriptException se) { - logger.error(se); } + } catch (SchedulerException e) { + logger.errorv(e, "Failed to cancel jobs for target {0}", target.jvmId); } } @@ -386,87 +496,63 @@ private static List enabledRules() { .toList(); } - void applyRuleToMatchingTargets(Rule r) { - resetActivations(r); - var targets = evaluator.getMatchedTargets(r.matchExpression); - for (var target : targets) { - activations.add(new ActivationAttempt(r, target)); + /** + * Exception indicating a permanent failure that should not be retried. Used for cases like + * target/rule not found where retrying won't help. + * + *

When thrown, SmallRye Fault Tolerance will abort retries and the Quartz job will be + * unscheduled. + */ + public static class PermanentFailureException extends RuntimeException { + public PermanentFailureException(String message) { + super(message); } } - @SuppressFBWarnings("EI_EXPOSE_REP") - public record RuleRecording(Rule rule, ActiveRecording recording) { - public RuleRecording { - Objects.requireNonNull(rule); - Objects.requireNonNull(recording); - } - } + @Label("Rule Activation") + @Description("Tracks rule activation attempts with success/failure status") + @SuppressFBWarnings( + value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", + justification = "JFR event fields are read by JFR infrastructure via reflection") + public static class RuleActivationEvent extends Event { + @Label("Rule Name") + public String ruleName; - @SuppressFBWarnings("EI_EXPOSE_REP") - public record ActivationAttempt(long ruleId, long targetId, AtomicInteger attempts) { - public ActivationAttempt(Rule rule, Target target) { - this(rule.id, target.id, new AtomicInteger(0)); - } + @Label("JVM ID") + public String jvmId; - public ActivationAttempt { - Objects.requireNonNull(attempts); - if (ruleId < 0) { - throw new IllegalArgumentException(); - } - if (targetId < 0) { - throw new IllegalArgumentException(); - } - if (attempts.get() < 0) { - throw new IllegalArgumentException(); - } - } + @Label("Success") + public boolean success; - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof ActivationAttempt other)) return false; - return ruleId == other.ruleId && targetId == other.targetId; - } + @Label("Permanent Failure") + public boolean permanentFailure; - @Override - public int hashCode() { - return Objects.hash(ruleId, targetId); - } + @Label("Error Message") + public String errorMessage; } - @SuppressFBWarnings("EI_EXPOSE_REP") - public record CleanupAttempt( - String ruleName, long targetId, String recordingName, AtomicInteger attempts) { - public CleanupAttempt(Rule rule, Target target) { - this(rule.name, target.id, rule.getRecordingName(), new AtomicInteger(0)); - } + @Label("Recording Cleanup") + @Description("Tracks recording cleanup attempts with success/failure status") + @SuppressFBWarnings( + value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", + justification = "JFR event fields are read by JFR infrastructure via reflection") + public static class RecordingCleanupEvent extends Event { + @Label("Rule Name") + public String ruleName; - public CleanupAttempt { - Objects.requireNonNull(ruleName); - Objects.requireNonNull(recordingName); - Objects.requireNonNull(attempts); - if (targetId < 0) { - throw new IllegalArgumentException(); - } - if (attempts.get() < 0) { - throw new IllegalArgumentException(); - } - } + @Label("JVM ID") + public String jvmId; - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof CleanupAttempt other)) return false; - return targetId == other.targetId && Objects.equals(recordingName, other.recordingName); - } + @Label("Recording Name") + public String recordingName; - @Override - public int hashCode() { - return Objects.hash(targetId, recordingName); - } + @Label("Success") + public boolean success; - public int incrementAndGet() { - return this.attempts.incrementAndGet(); - } + @Label("Skipped") + public boolean skipped; + + @Label("Error Message") + public String errorMessage; } } diff --git a/src/test/java/io/cryostat/AbstractTransactionalTestBase.java b/src/test/java/io/cryostat/AbstractTransactionalTestBase.java index 87a115089..cf49682b3 100644 --- a/src/test/java/io/cryostat/AbstractTransactionalTestBase.java +++ b/src/test/java/io/cryostat/AbstractTransactionalTestBase.java @@ -15,8 +15,6 @@ */ package io.cryostat; -import io.cryostat.rules.RuleService; - import jakarta.inject.Inject; import jakarta.persistence.EntityManager; import org.flywaydb.core.Flyway; @@ -27,7 +25,6 @@ public abstract class AbstractTransactionalTestBase extends AbstractTestBase { @Inject Flyway flyway; @Inject EntityManager entityManager; - @Inject RuleService ruleService; @BeforeEach void migrateFlyway() throws SchedulerException { @@ -36,7 +33,6 @@ void migrateFlyway() throws SchedulerException { flyway.migrate(); flyway.validate(); entityManager.clear(); - ruleService.clearQueues(); restartScheduler(); } } diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index 86fb417a5..37030d501 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import io.cryostat.AbstractTransactionalTestBase; @@ -59,9 +57,6 @@ public class RulesArchiverTest extends AbstractTransactionalTestBase { """, RULE_NAME); - private static final ScheduledExecutorService worker = - Executors.newSingleThreadScheduledExecutor(); - @AfterEach void cleanupRulesArchiverTest() { cleanupSelfActiveAndArchivedRecordings(); From d19dbe680701570df87430bdc4e5513ac5a46e37 Mon Sep 17 00:00:00 2001 From: Andrew Azores Date: Tue, 26 May 2026 12:17:14 -0400 Subject: [PATCH 15/15] request job recovery if Cryostat crashes during execution --- src/main/java/io/cryostat/rules/RuleService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index 779e19690..f80489825 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -388,6 +388,7 @@ private void scheduleActivationJob(Rule rule, Target target) { "rule-activations") .usingJobData("ruleName", rule.name) .usingJobData("jvmId", target.jvmId) + .requestRecovery() .build(); if (quartz.checkExists(job.getKey())) { @@ -429,6 +430,7 @@ private void scheduleCleanupJob(Rule rule, Target target) { .usingJobData("ruleName", rule.name) .usingJobData("jvmId", target.jvmId) .usingJobData("recordingName", rule.getRecordingName()) + .requestRecovery() .build(); if (quartz.checkExists(job.getKey())) {