diff --git a/src/main/docker/include/cryostat.jfc b/src/main/docker/include/cryostat.jfc index ba58198f2..7e0dfbd57 100644 --- a/src/main/docker/include/cryostat.jfc +++ b/src/main/docker/include/cryostat.jfc @@ -35,6 +35,22 @@ 0 ms + + + true + + + 0 ms + + + + + true + + + 0 ms + + true diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index 7f8861bc9..3e756e758 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -293,11 +293,19 @@ private List listActiveRecordingsImpl(Target target) { List descriptors = connectionManager.executeConnectedTask( target, conn -> conn.getService().getAvailableRecordings()); + + var remoteIds = + new HashSet<>(descriptors.stream().map(IRecordingDescriptor::getId).toList()); + boolean updated = false; var it = target.activeRecordings.iterator(); while (it.hasNext()) { var r = it.next(); - if (!previousIds.contains(r.remoteId)) { + if (!remoteIds.contains(r.remoteId)) { + logger.warnv( + "Orphaned recording detected: id={0} remoteId={1} name={2} on target" + + " {3}, removing from database", + r.id, r.remoteId, r.name, target.connectUrl); r.delete(); it.remove(); updated |= true; @@ -469,101 +477,137 @@ private ActiveRecording startRecordingImpl( } getActiveRecording(lockedTarget, r -> r.name.equals(recordingName)) .ifPresent(r -> this.deleteRecording(r).await().indefinitely()); - var desc = - connectionManager.executeConnectedTask( - lockedTarget, - conn -> { - RecordingOptionsBuilder optionsBuilder = - recordingOptionsBuilderFactory - .create(lockedTarget) - .name(recordingName); - if (options.duration().isPresent()) { - optionsBuilder = - optionsBuilder.duration( - TimeUnit.SECONDS.toMillis( - options.duration().get())); - } - if (options.toDisk().isPresent()) { - optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); - } - if (options.maxAge().isPresent()) { - optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); - } - if (options.maxSize().isPresent()) { - optionsBuilder = optionsBuilder.maxSize(options.maxSize().get()); - } - IConstrainedMap recordingOptions = optionsBuilder.build(); - - switch (template.getType()) { - case PRESET: - return conn.getService() - .start( - recordingOptions, - presetTemplateService - .getXml( - template.getName(), - TemplateType.CUSTOM) - .orElseThrow()); - case CUSTOM: - return conn.getService() - .start( - recordingOptions, - customTemplateService - .getXml( - template.getName(), - TemplateType.CUSTOM) - .orElseThrow()); - case TARGET: - return conn.getService().start(recordingOptions, template); - default: - throw new IllegalStateException( - "Unknown template type: " + template.getType()); - } - }); + + IRecordingDescriptor desc; + try { + desc = + connectionManager.executeConnectedTask( + lockedTarget, + conn -> { + RecordingOptionsBuilder optionsBuilder = + recordingOptionsBuilderFactory + .create(lockedTarget) + .name(recordingName); + if (options.duration().isPresent()) { + optionsBuilder = + optionsBuilder.duration( + TimeUnit.SECONDS.toMillis( + options.duration().get())); + } + if (options.toDisk().isPresent()) { + optionsBuilder = optionsBuilder.toDisk(options.toDisk().get()); + } + if (options.maxAge().isPresent()) { + optionsBuilder = optionsBuilder.maxAge(options.maxAge().get()); + } + if (options.maxSize().isPresent()) { + optionsBuilder = + optionsBuilder.maxSize(options.maxSize().get()); + } + IConstrainedMap recordingOptions = optionsBuilder.build(); + + switch (template.getType()) { + case PRESET: + return conn.getService() + .start( + recordingOptions, + presetTemplateService + .getXml( + template.getName(), + TemplateType.CUSTOM) + .orElseThrow()); + case CUSTOM: + return conn.getService() + .start( + recordingOptions, + customTemplateService + .getXml( + template.getName(), + TemplateType.CUSTOM) + .orElseThrow()); + case TARGET: + return conn.getService().start(recordingOptions, template); + default: + throw new IllegalStateException( + "Unknown template type: " + template.getType()); + } + }); + } catch (Exception e) { + logger.errorv( + e, + "Failed to start remote recording \"{0}\" on target {1}", + recordingName, + lockedTarget.connectUrl); + throw e; + } Map labels = new HashMap<>(rawLabels); labels.put("template.name", template.getName()); labels.put("template.type", template.getType().toString()); Metadata meta = new Metadata(labels); - ActiveRecording recording = ActiveRecording.from(lockedTarget, desc, meta, options); - - Optional existingOpt = - ActiveRecording.find( - "target.id = ?1 and remoteId = ?2", - lockedTarget.id, - recording.remoteId) - .firstResultOptional(); - - if (existingOpt.isPresent()) { - ActiveRecording existingRecording = existingOpt.get(); - logger.infov( - "Found existing recording id={0} remoteId={1} name={2}," - + " merging state from startRecording", - existingRecording.id, existingRecording.remoteId, existingRecording.name); - - existingRecording.name = recording.name; - existingRecording.state = recording.state; - existingRecording.duration = recording.duration; - existingRecording.startTime = recording.startTime; - existingRecording.archiveOnStop = recording.archiveOnStop; - existingRecording.continuous = recording.continuous; - existingRecording.toDisk = recording.toDisk; - existingRecording.maxSize = recording.maxSize; - existingRecording.maxAge = recording.maxAge; - existingRecording.external = recording.external; - - var mergedLabels = new HashMap<>(existingRecording.metadata.labels()); - mergedLabels.putAll(recording.metadata.labels()); - existingRecording.metadata = new Metadata(mergedLabels); - - existingRecording.persist(); - recording = existingRecording; - } else { - recording.persist(); - } + ActiveRecording recording; + try { + recording = ActiveRecording.from(lockedTarget, desc, meta, options); + + Optional existingOpt = + ActiveRecording.find( + "target.id = ?1 and remoteId = ?2", + lockedTarget.id, + recording.remoteId) + .firstResultOptional(); + + if (existingOpt.isPresent()) { + ActiveRecording existingRecording = existingOpt.get(); + logger.infov( + "Found existing recording id={0} remoteId={1} name={2}," + + " merging state from startRecording", + existingRecording.id, existingRecording.remoteId, existingRecording.name); + + existingRecording.name = recording.name; + existingRecording.state = recording.state; + existingRecording.duration = recording.duration; + existingRecording.startTime = recording.startTime; + existingRecording.archiveOnStop = recording.archiveOnStop; + existingRecording.continuous = recording.continuous; + existingRecording.toDisk = recording.toDisk; + existingRecording.maxSize = recording.maxSize; + existingRecording.maxAge = recording.maxAge; + existingRecording.external = recording.external; + + var mergedLabels = new HashMap<>(existingRecording.metadata.labels()); + mergedLabels.putAll(recording.metadata.labels()); + existingRecording.metadata = new Metadata(mergedLabels); + + existingRecording.persist(); + recording = existingRecording; + } else { + recording.persist(); + } - lockedTarget.activeRecordings.add(recording); + lockedTarget.activeRecordings.add(recording); + } catch (Exception e) { + logger.errorv( + e, + "Failed to persist recording \"{0}\" to database, attempting to stop remote" + + " recording", + recordingName); + try { + connectionManager.executeConnectedTask( + lockedTarget, + conn -> { + conn.getService().close(desc); + return null; + }); + } catch (Exception cleanupEx) { + logger.warnv( + cleanupEx, + "Failed to cleanup remote recording \"{0}\" after database persist" + + " failure", + recordingName); + } + throw e; + } if (!recording.continuous) { JobKey key = diff --git a/src/main/java/io/cryostat/rules/RecordingCleanupJob.java b/src/main/java/io/cryostat/rules/RecordingCleanupJob.java new file mode 100644 index 000000000..798fc8005 --- /dev/null +++ b/src/main/java/io/cryostat/rules/RecordingCleanupJob.java @@ -0,0 +1,65 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.rules; + +import jakarta.inject.Inject; +import org.jboss.logging.Logger; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +/** + * Quartz job that delegates to RuleService for fault-tolerant recording cleanup. + * + *

This job is scheduled when a recording needs to be cleaned up (stopped). It delegates to + * RuleService.cleanupRecording() which handles all retry logic, timeouts, and backpressure via + * SmallRye Fault Tolerance annotations. + * + *

The job is one-time only and unscheduled after execution (success or failure). + */ +@DisallowConcurrentExecution +public class RecordingCleanupJob implements Job { + + @Inject Logger logger; + @Inject RuleService ruleService; + + @Override + public void execute(JobExecutionContext ctx) throws JobExecutionException { + String ruleName = ctx.getMergedJobDataMap().getString("ruleName"); + String jvmId = ctx.getMergedJobDataMap().getString("jvmId"); + String recordingName = ctx.getMergedJobDataMap().getString("recordingName"); + + logger.debugv( + "Executing cleanup job: rule={0} jvmId={1} recording={2}", + ruleName, jvmId, recordingName); + + try { + ruleService.cleanupRecording(ruleName, jvmId, recordingName).await().indefinitely(); + logger.debugv("Recording cleanup completed: rule={0} jvmId={1}", ruleName, jvmId); + ctx.getScheduler().unscheduleJob(ctx.getTrigger().getKey()); + } catch (Exception e) { + logger.errorv( + e, + "Recording cleanup failed after all retries: rule={0} jvmId={1}", + ruleName, + jvmId); + JobExecutionException ex = new JobExecutionException(e); + ex.setUnscheduleFiringTrigger(true); + throw ex; + } + } +} diff --git a/src/main/java/io/cryostat/rules/RuleActivationJob.java b/src/main/java/io/cryostat/rules/RuleActivationJob.java new file mode 100644 index 000000000..90d580448 --- /dev/null +++ b/src/main/java/io/cryostat/rules/RuleActivationJob.java @@ -0,0 +1,62 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.rules; + +import jakarta.inject.Inject; +import org.jboss.logging.Logger; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +/** + * Quartz job that delegates to RuleService for fault-tolerant rule activation. + * + *

This job is scheduled when a rule needs to be activated on a target. It delegates to + * RuleService.activateRule() which handles all retry logic, timeouts, and backpressure via SmallRye + * Fault Tolerance annotations. + * + *

The job is one-time only and unscheduled after execution (success or failure). + */ +@DisallowConcurrentExecution +public class RuleActivationJob implements Job { + + @Inject Logger logger; + @Inject RuleService ruleService; + + @Override + public void execute(JobExecutionContext ctx) throws JobExecutionException { + String ruleName = ctx.getMergedJobDataMap().getString("ruleName"); + String jvmId = ctx.getMergedJobDataMap().getString("jvmId"); + + logger.debugv("Executing rule activation job: rule={0} jvmId={1}", ruleName, jvmId); + + try { + ruleService.activateRule(ruleName, jvmId).await().indefinitely(); + logger.debugv("Rule activation completed: rule={0} jvmId={1}", ruleName, jvmId); + ctx.getScheduler().unscheduleJob(ctx.getTrigger().getKey()); + } catch (Exception e) { + logger.errorv( + e, + "Rule activation failed after all retries: rule={0} jvmId={1}", + ruleName, + jvmId); + JobExecutionException ex = new JobExecutionException(e); + ex.setUnscheduleFiringTrigger(true); + throw ex; + } + } +} diff --git a/src/main/java/io/cryostat/rules/RuleExecutor.java b/src/main/java/io/cryostat/rules/RuleExecutor.java index cd8659d98..7d7f7e033 100644 --- a/src/main/java/io/cryostat/rules/RuleExecutor.java +++ b/src/main/java/io/cryostat/rules/RuleExecutor.java @@ -33,7 +33,6 @@ import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.rules.Rule.RuleEvent; -import io.cryostat.rules.RuleService.ActivationAttempt; import io.cryostat.targets.Target; import io.cryostat.targets.Target.TargetDiscovery; import io.cryostat.util.EntityExistsException; @@ -82,70 +81,6 @@ void onStop(@Observes ShutdownEvent evt) throws SchedulerException { quartz.shutdown(); } - @ConsumeEvent(blocking = true) - @Transactional - Uni onMessage(ActivationAttempt attempt) { - logger.tracev( - "Attempting to activate rule \"{0}\" for target {1} - attempt #{2}", - attempt.ruleId(), attempt.targetId(), attempt.attempts()); - try { - var targetOpt = Target.find("id", attempt.targetId()).firstResultOptional(); - if (targetOpt.isEmpty()) { - logger.warnv( - "Target {0} no longer exists, skipping rule activation attempt", - attempt.targetId()); - return Uni.createFrom().nullItem(); - } - Target target = targetOpt.get(); - - var ruleOpt = Rule.find("id", attempt.ruleId()).firstResultOptional(); - if (ruleOpt.isEmpty()) { - logger.warnv( - "Rule {0} no longer exists, skipping activation attempt", attempt.ruleId()); - return Uni.createFrom().nullItem(); - } - Rule rule = ruleOpt.get(); - - Pair pair = - recordingHelper.parseEventSpecifier(rule.eventSpecifier); - Template template = - recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); - - var priorRecording = - recordingHelper.getActiveRecording( - target, r -> Objects.equals(r.name, rule.getRecordingName())); - if (priorRecording.isPresent()) { - recordingHelper.stopRecording(priorRecording.get()).await().indefinitely(); - } - var labels = new HashMap<>(rule.metadata.labels()); - labels.put(RULE_LABEL_KEY, rule.name); - ActiveRecording recording = null; - try { - recording = - recordingHelper - .startRecording( - target, - RecordingReplace.STOPPED, - template, - createRecordingOptions(rule), - labels) - .await() - .indefinitely(); - } catch (EntityExistsException eee) { - // ignore - the recording already existed and was running, so we don't want to - // replace it - but we should continue on to reschedule the periodic archival job - } - if (recording != null && rule.isArchiver()) { - scheduleArchival(rule, target, recording); - } - } catch (Exception e) { - logger.error("Rule execution failed", e); - return Uni.createFrom().failure(e); - } - - return Uni.createFrom().nullItem(); - } - @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) void onMessage(TargetDiscovery event) { switch (event.kind()) { @@ -201,6 +136,69 @@ public void handleRuleRecordingCleanup(Rule rule) { logger.debugv("Cancelled scheduled tasks for rule \"{0}\"", rule.name); } + public Uni activate(Target target, Rule rule) { + try { + Pair pair = + recordingHelper.parseEventSpecifier(rule.eventSpecifier); + Template template = + recordingHelper.getPreferredTemplate(target, pair.getKey(), pair.getValue()); + + var priorRecording = + recordingHelper.getActiveRecording( + target, r -> Objects.equals(r.name, rule.getRecordingName())); + if (priorRecording.isPresent()) { + recordingHelper.stopRecording(priorRecording.get()).await().indefinitely(); + } + var labels = new HashMap<>(rule.metadata.labels()); + labels.put(RULE_LABEL_KEY, rule.name); + ActiveRecording recording = null; + try { + recording = + recordingHelper + .startRecording( + target, + RecordingReplace.STOPPED, + template, + createRecordingOptions(rule), + labels) + .await() + .atMost(java.time.Duration.ofSeconds(30)); + } catch (EntityExistsException eee) { + logger.debugv( + "Recording \"{0}\" already exists on target {1}, fetching existing" + + " recording", + rule.getRecordingName(), target.id); + var existingRecording = + recordingHelper.getActiveRecording( + target, r -> Objects.equals(r.name, rule.getRecordingName())); + if (existingRecording.isPresent()) { + recording = existingRecording.get(); + } else { + logger.warnv( + "Recording \"{0}\" should exist on target {1} but was not found", + rule.getRecordingName(), target.id); + } + } + if (recording != null && rule.isArchiver()) { + scheduleArchival(rule, target, recording); + } else if (recording == null) { + logger.errorv( + "Failed to activate rule \"{0}\" on target {1}: recording is null", + rule.name, target.connectUrl); + throw new IllegalStateException( + "Recording activation failed but no exception was thrown"); + } + return Uni.createFrom().nullItem(); + } catch (Exception e) { + logger.errorv( + e, + "Rule \"{0}\" activation failed on target {1}", + rule.name, + target.connectUrl); + return Uni.createFrom().failure(e); + } + } + private void cancelTasksForRule(Rule rule) { if (rule.isArchiver()) { List targets = diff --git a/src/main/java/io/cryostat/rules/RuleService.java b/src/main/java/io/cryostat/rules/RuleService.java index dcc21853b..f80489825 100644 --- a/src/main/java/io/cryostat/rules/RuleService.java +++ b/src/main/java/io/cryostat/rules/RuleService.java @@ -16,21 +16,13 @@ package io.cryostat.rules; import java.time.Duration; -import java.util.Comparator; -import java.util.Iterator; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; +import java.util.Set; import io.cryostat.ConfigProperties; import io.cryostat.expressions.MatchExpressionEvaluator; -import io.cryostat.recordings.ActiveRecording; import io.cryostat.recordings.RecordingHelper; import io.cryostat.rules.Rule.RuleEvent; import io.cryostat.targets.Target; @@ -41,19 +33,32 @@ import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.vertx.ConsumeEvent; -import io.smallrye.mutiny.infrastructure.Infrastructure; -import io.vertx.ext.web.handler.HttpException; -import io.vertx.mutiny.core.eventbus.EventBus; +import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.persistence.NoResultException; import jakarta.transaction.Transactional; import jakarta.ws.rs.NotFoundException; +import jdk.jfr.Description; +import jdk.jfr.Event; +import jdk.jfr.Label; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.faulttolerance.Asynchronous; +import org.eclipse.microprofile.faulttolerance.Bulkhead; +import org.eclipse.microprofile.faulttolerance.CircuitBreaker; +import org.eclipse.microprofile.faulttolerance.Retry; +import org.eclipse.microprofile.faulttolerance.Timeout; import org.jboss.logging.Logger; import org.projectnessie.cel.tools.ScriptException; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.matchers.GroupMatcher; /** * Handle {@link io.cryostat.target.Target} instances appearing and disappearing, and {@link @@ -71,86 +76,47 @@ public class RuleService { @Inject Logger logger; @Inject MatchExpressionEvaluator evaluator; @Inject RecordingHelper recordingHelper; - @Inject EventBus bus; + @Inject RuleExecutor ruleExecutor; + @Inject Scheduler quartz; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration connectionFailedTimeout; - private final BlockingQueue activations = - new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); - private final BlockingQueue cleanups = - new PriorityBlockingQueue<>(255, Comparator.comparing(t -> t.attempts.get())); - private final ExecutorService activator = Executors.newSingleThreadExecutor(); - private final ExecutorService cleaner = Executors.newSingleThreadExecutor(); - private final ExecutorService workers = Executors.newVirtualThreadPerTaskExecutor(); - void onStart(@Observes StartupEvent ev) { logger.trace("RuleService started"); - activator.submit( - () -> { - for (Rule rule : enabledRules()) { - try { - QuarkusTransaction.requiringNew() - .run(() -> applyRuleToMatchingTargets(rule)); - } catch (Exception e) { - logger.error(e); - } - } - }); - activator.submit( - () -> { - while (!activator.isShutdown()) { - ActivationAttempt attempt = null; - try { - attempt = activations.take(); - } catch (InterruptedException ie) { - logger.trace(ie); - break; - } - final ActivationAttempt fAttempt = attempt; - workers.submit(() -> fireAttemptExecution(fAttempt)); - } - }); - cleaner.submit( - () -> { - while (!cleaner.isShutdown()) { - CleanupAttempt attempt = null; - try { - attempt = cleanups.take(); - } catch (InterruptedException ie) { - logger.trace(ie); - break; - } - final CleanupAttempt fAttempt = attempt; - workers.submit(() -> fireCleanupExecution(fAttempt)); - } - }); + // Apply all enabled rules to matching targets + for (Rule rule : enabledRules()) { + try { + QuarkusTransaction.requiringNew().run(() -> applyRuleToMatchingTargets(rule)); + } catch (Exception e) { + logger.error(e); + } + } } - void onStop(@Observes ShutdownEvent evt) throws SchedulerException { - activator.shutdown(); - cleaner.shutdown(); - activations.clear(); - cleanups.clear(); + void onStop(@Observes ShutdownEvent evt) { + try { + quartz.shutdown(); + } catch (SchedulerException e) { + logger.error("Failed to shutdown Quartz scheduler", e); + } } @ConsumeEvent(value = Target.TARGET_JVM_DISCOVERY, blocking = true) @Transactional void onMessage(TargetDiscovery event) { if (event.serviceRef().id == null) { - // target is not persisted yet, skip and wait for an update after it is return; } switch (event.kind()) { case MODIFIED: - // fall-through case FOUND: if (event.serviceRef().isConnectable()) { applyRulesToTarget(event.serviceRef()); } break; case LOST: - resetActivations(event.serviceRef()); + cancelJobsForTarget(event.serviceRef()); break; default: break; @@ -182,188 +148,345 @@ public void handleRuleModification(RuleEvent event) { public void handleRuleRecordingCleanup(Rule rule) { var targets = evaluator.getMatchedTargets(rule.matchExpression); for (var target : targets) { - CleanupAttempt attempt = new CleanupAttempt(rule, target); - cleanups.add(attempt); - logger.debugv( - "Queued cleanup for recording \"{0}\" on target {1}", - rule.getRecordingName(), target.id); + scheduleCleanupJob(rule, target); } } - private void fireAttemptExecution(ActivationAttempt fAttempt) { + @Retry( + maxRetries = 12, + delay = 1, + delayUnit = ChronoUnit.SECONDS, + maxDuration = 5, + durationUnit = ChronoUnit.MINUTES, + jitter = 200, + retryOn = {Exception.class}, + abortOn = { + NoResultException.class, + NotFoundException.class, + PermanentFailureException.class + }) + @Bulkhead(value = 10, waitingTaskQueue = 255) + @CircuitBreaker( + requestVolumeThreshold = 10, + failureRatio = 0.5, + delay = 30, + delayUnit = ChronoUnit.SECONDS, + successThreshold = 3) + @Timeout(value = 30, unit = ChronoUnit.SECONDS) + @Asynchronous + public Uni activateRule(String ruleName, String jvmId) { + RuleActivationEvent event = new RuleActivationEvent(); + event.begin(); + event.ruleName = ruleName; + event.jvmId = jvmId; + try { - logger.tracev( - "Attempting to activate rule \"{0}\" for" + " target {1} - attempt #{2}", - fAttempt.ruleId, fAttempt.targetId, fAttempt.attempts); - bus.request(RuleExecutor.class.getName(), fAttempt) - .await() - .atMost(connectionFailedTimeout); - logger.tracev( - "Activated rule \"{0}\" for target {1}", fAttempt.ruleId, fAttempt.targetId); + return QuarkusTransaction.joiningExisting() + .call( + () -> { + var targetOpt = Target.getTargetByJvmId(jvmId); + if (targetOpt.isEmpty()) { + logger.warnv( + "Target with jvmId {0} no longer exists, aborting rule" + + " activation", + jvmId); + if (event.shouldCommit()) { + event.success = false; + event.permanentFailure = true; + event.errorMessage = "Target not found"; + event.commit(); + } + throw new PermanentFailureException( + "Target with jvmId " + jvmId + " not found"); + } + Target target = targetOpt.get(); + + var ruleOpt = + Rule.find("name", ruleName).firstResultOptional(); + if (ruleOpt.isEmpty()) { + logger.warnv( + "Rule {0} no longer exists, aborting activation", + ruleName); + if (event.shouldCommit()) { + event.success = false; + event.permanentFailure = true; + event.errorMessage = "Rule not found"; + event.commit(); + } + throw new PermanentFailureException( + "Rule " + ruleName + " not found"); + } + Rule rule = ruleOpt.get(); + + return ruleExecutor + .activate(target, rule) + .onItem() + .invoke( + () -> { + if (event.shouldCommit()) { + event.success = true; + event.commit(); + } + }) + .onFailure() + .invoke( + t -> { + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = t.getMessage(); + event.commit(); + } + }); + }); + } catch (PermanentFailureException e) { + return Uni.createFrom().failure(e); } catch (Exception e) { - if (fAttempt != null) { - int count = fAttempt.attempts.incrementAndGet(); - int delay = (int) Math.pow(2, count); - TimeUnit unit = TimeUnit.SECONDS; - int limit = 5; - if (count < limit) { - logger.debugv( - "Rule \"{0}\" activation attempt" - + " #{1} for target {2} failed," - + " rescheduling in {3}{4} ...", - fAttempt.ruleId, count - 1, fAttempt.targetId, delay, unit); - Infrastructure.getDefaultWorkerPool() - .schedule(() -> activations.add(fAttempt), delay, unit); - } else { - logger.errorv( - "Rule \"{0}\" activation attempt" - + " #{1} failed for target {2}" - + " - limit ({3}) reached! Will" - + " not retry...", - fAttempt.ruleId, count, fAttempt.targetId, limit); - } + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = e.getMessage(); + event.commit(); } - logger.error(e); + return Uni.createFrom().failure(e); } } - private void fireCleanupExecution(CleanupAttempt fAttempt) { - try { - logger.tracev( - "Attempting to cleanup recording \"{0}\" on target {1} - attempt #{2}", - fAttempt.recordingName, fAttempt.targetId, fAttempt.attempts.get()); + @Retry( + maxRetries = 5, + delay = 1, + delayUnit = ChronoUnit.SECONDS, + maxDuration = 2, + durationUnit = ChronoUnit.MINUTES, + jitter = 200, + retryOn = {Exception.class}, + abortOn = { + NoResultException.class, + NotFoundException.class, + PermanentFailureException.class + }) + @Bulkhead(value = 10, waitingTaskQueue = 255) + @CircuitBreaker( + requestVolumeThreshold = 10, + failureRatio = 0.5, + delay = 30, + delayUnit = ChronoUnit.SECONDS) + @Timeout(value = 30, unit = ChronoUnit.SECONDS) + @Asynchronous + public Uni cleanupRecording(String ruleName, String jvmId, String recordingName) { + RecordingCleanupEvent event = new RecordingCleanupEvent(); + event.begin(); + event.ruleName = ruleName; + event.jvmId = jvmId; + event.recordingName = recordingName; - QuarkusTransaction.requiringNew() - .run( + try { + return QuarkusTransaction.joiningExisting() + .call( () -> { - var targetOpt = - Target.find("id", fAttempt.targetId) - .firstResultOptional(); + var targetOpt = Target.getTargetByJvmId(jvmId); if (targetOpt.isEmpty()) { logger.infov( - "Target {0} no longer exists, cleanup complete", - fAttempt.targetId); - return; + "Target with jvmId {0} no longer exists, cleanup" + + " complete", + jvmId); + if (event.shouldCommit()) { + event.success = true; + event.skipped = true; + event.commit(); + } + throw new PermanentFailureException( + "Target not found - cleanup complete"); } Target target = targetOpt.get(); var recordingOpt = recordingHelper.getActiveRecording( - target, - r -> - Objects.equals( - r.name, fAttempt.recordingName)); + target, r -> Objects.equals(r.name, recordingName)); + if (recordingOpt.isEmpty()) { logger.infov( "Recording \"{0}\" no longer exists on target {1}," + " cleanup complete", - fAttempt.recordingName, fAttempt.targetId); - return; + recordingName, jvmId); + if (event.shouldCommit()) { + event.success = true; + event.skipped = true; + event.commit(); + } + throw new PermanentFailureException( + "Recording not found - cleanup complete"); } - var recording = recordingOpt.get(); - try { - recordingHelper.stopRecording(recording).await().indefinitely(); - logger.infov( - "Successfully stopped recording \"{0}\" on target {1}", - fAttempt.recordingName, fAttempt.targetId); - } catch (Exception stopEx) { - throw new RuntimeException(stopEx); - } + return recordingHelper + .stopRecording(recordingOpt.get()) + .onItem() + .invoke( + () -> { + if (event.shouldCommit()) { + event.success = true; + event.commit(); + } + }) + .onFailure() + .invoke( + t -> { + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = t.getMessage(); + event.commit(); + } + }) + .replaceWithVoid(); }); + } catch (PermanentFailureException e) { + return Uni.createFrom().failure(e); } catch (Exception e) { - if (fAttempt != null) { - int count = fAttempt.attempts.incrementAndGet(); - int delay = (int) Math.pow(2, count); - TimeUnit unit = TimeUnit.SECONDS; - int limit = 5; - - // Check if this is a permanent failure (target/recording gone) - boolean isPermanentFailure = isPermanentCleanupFailure(e); - - if (isPermanentFailure) { - logger.infov( - "Recording \"{0}\" cleanup on target {1} failed permanently: {2}." - + " Will not retry.", - fAttempt.recordingName, fAttempt.targetId, e.getMessage()); - } else if (count < limit) { - logger.debugv( - "Recording \"{0}\" cleanup attempt #{1} on target {2} failed," - + " rescheduling in {3}{4} ...", - fAttempt.recordingName, count - 1, fAttempt.targetId, delay, unit); - Infrastructure.getDefaultWorkerPool() - .schedule(() -> cleanups.add(fAttempt), delay, unit); - } else { - logger.errorv( - "Recording \"{0}\" cleanup attempt #{1} failed on target {2}" - + " - limit ({3}) reached! Will not retry...", - fAttempt.recordingName, count, fAttempt.targetId, limit); - } - } - if (!isPermanentCleanupFailure(e)) { - logger.warn(e); + if (event.shouldCommit()) { + event.success = false; + event.errorMessage = e.getMessage(); + event.commit(); } + return Uni.createFrom().failure(e); } } - private boolean isPermanentCleanupFailure(Throwable t) { - if (t == null) { - return false; - } - // Target deleted or recording already stopped/deleted - if (t instanceof NoResultException) { - return true; - } - if (t instanceof NotFoundException) { - return true; - } - if (t instanceof HttpException httpEx) { - // 404 = recording not found, 400 = bad request (recording already stopped) - return httpEx.getStatusCode() == 404 || httpEx.getStatusCode() == 400; + void applyRulesToTarget(Target target) { + if (target.jvmId == null) { + logger.warnv("Target {0} has no jvmId, cannot apply rules", target.connectUrl); + return; } - // Check cause recursively - return isPermanentCleanupFailure(t.getCause()); - } - private void resetActivations(Rule rule) { - resetActivations(a -> a.ruleId == rule.id); + for (var rule : enabledRules()) { + try { + if (evaluator.applies(rule.matchExpression, target)) { + scheduleActivationJob(rule, target); + } + } catch (ScriptException se) { + logger.error(se); + } + } } - private void resetActivations(Target target) { - resetActivations(a -> a.targetId == target.id); - resetCleanups(c -> c.targetId == target.id); + void applyRuleToMatchingTargets(Rule rule) { + var targets = evaluator.getMatchedTargets(rule.matchExpression); + for (var target : targets) { + if (target.jvmId != null) { + scheduleActivationJob(rule, target); + } else { + logger.warnv( + "Target {0} has no jvmId, cannot apply rule {1}", + target.connectUrl, rule.name); + } + } } - private void resetActivations(Predicate p) { - Iterator it = activations.iterator(); - while (it.hasNext()) { - ActivationAttempt attempt = it.next(); - if (p.test(attempt)) { - it.remove(); + private void scheduleActivationJob(Rule rule, Target target) { + try { + JobDetail job = + JobBuilder.newJob(RuleActivationJob.class) + .withIdentity( + "activation-" + rule.name + "-" + target.jvmId, + "rule-activations") + .usingJobData("ruleName", rule.name) + .usingJobData("jvmId", target.jvmId) + .requestRecovery() + .build(); + + if (quartz.checkExists(job.getKey())) { + logger.debugv( + "Activation job already exists: rule={0} jvmId={1}", + rule.name, target.jvmId); + return; } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(job.getKey().getName(), job.getKey().getGroup()) + .startNow() + .build(); + + quartz.scheduleJob(job, trigger); + logger.debugv("Scheduled activation job: rule={0} jvmId={1}", rule.name, target.jvmId); + + } catch (SchedulerException e) { + logger.errorv( + e, + "Failed to schedule activation job: rule={0} jvmId={1}", + rule.name, + target.jvmId); } } - private void resetCleanups(Predicate p) { - Iterator it = cleanups.iterator(); - while (it.hasNext()) { - CleanupAttempt attempt = it.next(); - if (p.test(attempt)) { - it.remove(); + private void scheduleCleanupJob(Rule rule, Target target) { + if (target.jvmId == null) { + logger.warnv("Target {0} has no jvmId, cannot schedule cleanup", target.connectUrl); + return; + } + + try { + JobDetail job = + JobBuilder.newJob(RecordingCleanupJob.class) + .withIdentity( + "cleanup-" + rule.name + "-" + target.jvmId, "rule-cleanups") + .usingJobData("ruleName", rule.name) + .usingJobData("jvmId", target.jvmId) + .usingJobData("recordingName", rule.getRecordingName()) + .requestRecovery() + .build(); + + if (quartz.checkExists(job.getKey())) { + logger.debugv( + "Cleanup job already exists: rule={0} jvmId={1}", rule.name, target.jvmId); + return; } + + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(job.getKey().getName(), job.getKey().getGroup()) + .startNow() + .build(); + + quartz.scheduleJob(job, trigger); + logger.debugv("Scheduled cleanup job: rule={0} jvmId={1}", rule.name, target.jvmId); + + } catch (SchedulerException e) { + logger.errorv( + e, + "Failed to schedule cleanup job: rule={0} jvmId={1}", + rule.name, + target.jvmId); } } - void applyRulesToTarget(Target target) { - resetActivations(target); - for (var rule : enabledRules()) { - try { - if (evaluator.applies(rule.matchExpression, target)) { - activations.add(new ActivationAttempt(rule, target)); + private void cancelJobsForTarget(Target target) { + if (target.jvmId == null) { + return; + } + + try { + Set activationJobs = + quartz.getJobKeys(GroupMatcher.jobGroupEquals("rule-activations")); + Set cleanupJobs = + quartz.getJobKeys(GroupMatcher.jobGroupEquals("rule-cleanups")); + + for (JobKey key : activationJobs) { + if (key.getName().endsWith("-" + target.jvmId)) { + quartz.deleteJob(key); + logger.debugv( + "Cancelled activation job {0} for lost target {1}", + key.getName(), target.jvmId); } - } catch (ScriptException se) { - logger.error(se); } + + for (JobKey key : cleanupJobs) { + if (key.getName().endsWith("-" + target.jvmId)) { + quartz.deleteJob(key); + logger.debugv( + "Cancelled cleanup job {0} for lost target {1}", + key.getName(), target.jvmId); + } + } + } catch (SchedulerException e) { + logger.errorv(e, "Failed to cancel jobs for target {0}", target.jvmId); } } @@ -375,87 +498,63 @@ private static List enabledRules() { .toList(); } - void applyRuleToMatchingTargets(Rule r) { - resetActivations(r); - var targets = evaluator.getMatchedTargets(r.matchExpression); - for (var target : targets) { - activations.add(new ActivationAttempt(r, target)); + /** + * Exception indicating a permanent failure that should not be retried. Used for cases like + * target/rule not found where retrying won't help. + * + *

When thrown, SmallRye Fault Tolerance will abort retries and the Quartz job will be + * unscheduled. + */ + public static class PermanentFailureException extends RuntimeException { + public PermanentFailureException(String message) { + super(message); } } - @SuppressFBWarnings("EI_EXPOSE_REP") - public record RuleRecording(Rule rule, ActiveRecording recording) { - public RuleRecording { - Objects.requireNonNull(rule); - Objects.requireNonNull(recording); - } - } + @Label("Rule Activation") + @Description("Tracks rule activation attempts with success/failure status") + @SuppressFBWarnings( + value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", + justification = "JFR event fields are read by JFR infrastructure via reflection") + public static class RuleActivationEvent extends Event { + @Label("Rule Name") + public String ruleName; - @SuppressFBWarnings("EI_EXPOSE_REP") - public record ActivationAttempt(long ruleId, long targetId, AtomicInteger attempts) { - public ActivationAttempt(Rule rule, Target target) { - this(rule.id, target.id, new AtomicInteger(0)); - } + @Label("JVM ID") + public String jvmId; - public ActivationAttempt { - Objects.requireNonNull(attempts); - if (ruleId < 0) { - throw new IllegalArgumentException(); - } - if (targetId < 0) { - throw new IllegalArgumentException(); - } - if (attempts.get() < 0) { - throw new IllegalArgumentException(); - } - } + @Label("Success") + public boolean success; - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof ActivationAttempt other)) return false; - return ruleId == other.ruleId && targetId == other.targetId; - } + @Label("Permanent Failure") + public boolean permanentFailure; - @Override - public int hashCode() { - return Objects.hash(ruleId, targetId); - } + @Label("Error Message") + public String errorMessage; } - @SuppressFBWarnings("EI_EXPOSE_REP") - public record CleanupAttempt( - String ruleName, long targetId, String recordingName, AtomicInteger attempts) { - public CleanupAttempt(Rule rule, Target target) { - this(rule.name, target.id, rule.getRecordingName(), new AtomicInteger(0)); - } + @Label("Recording Cleanup") + @Description("Tracks recording cleanup attempts with success/failure status") + @SuppressFBWarnings( + value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", + justification = "JFR event fields are read by JFR infrastructure via reflection") + public static class RecordingCleanupEvent extends Event { + @Label("Rule Name") + public String ruleName; - public CleanupAttempt { - Objects.requireNonNull(ruleName); - Objects.requireNonNull(recordingName); - Objects.requireNonNull(attempts); - if (targetId < 0) { - throw new IllegalArgumentException(); - } - if (attempts.get() < 0) { - throw new IllegalArgumentException(); - } - } + @Label("JVM ID") + public String jvmId; - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof CleanupAttempt other)) return false; - return targetId == other.targetId && Objects.equals(recordingName, other.recordingName); - } + @Label("Recording Name") + public String recordingName; - @Override - public int hashCode() { - return Objects.hash(targetId, recordingName); - } + @Label("Success") + public boolean success; - public int incrementAndGet() { - return this.attempts.incrementAndGet(); - } + @Label("Skipped") + public boolean skipped; + + @Label("Error Message") + public String errorMessage; } } diff --git a/src/test/java/io/cryostat/rules/RulesArchiverTest.java b/src/test/java/io/cryostat/rules/RulesArchiverTest.java index b5df707ed..37030d501 100644 --- a/src/test/java/io/cryostat/rules/RulesArchiverTest.java +++ b/src/test/java/io/cryostat/rules/RulesArchiverTest.java @@ -20,9 +20,6 @@ import java.io.IOException; import java.time.Duration; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import io.cryostat.AbstractTransactionalTestBase; @@ -60,9 +57,6 @@ public class RulesArchiverTest extends AbstractTransactionalTestBase { """, RULE_NAME); - private static final ScheduledExecutorService worker = - Executors.newSingleThreadScheduledExecutor(); - @AfterEach void cleanupRulesArchiverTest() { cleanupSelfActiveAndArchivedRecordings(); @@ -107,31 +101,47 @@ public void test() .then() .statusCode(201); - // stop further background jobs before checking results - worker.schedule( - () -> { - given().log() - .all() - // do not clean, or else Cryostat will archive the recording on stop and - // create an additional copy - .queryParam("clean", false) - .pathParam("ruleName", RULE_NAME) - .delete("/api/v4/rules/{ruleName}") - .then() - .log() - .all() - .and() - .assertThat() - .statusCode(204) - .body(Matchers.emptyOrNullString()); - }, - // this is enough time for 4-5 copies to be made, but we expect the oldest to get - // rolled over so 3 should remain - 50, - TimeUnit.SECONDS); - - webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(50)); - webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(65)); + // Wait for rule creation to be processed + webSocketClient.expectNotification("RuleCreated", Duration.ofSeconds(5)); + + // Wait for the rule to activate and start the recording + webSocketClient.expectNotification("ActiveRecordingCreated", Duration.ofSeconds(15)); + + // Wait for archives to be created. With preservedArchives=3 and archivalPeriodSeconds=10: + // Note: initialDelaySeconds=0 is converted to archivalPeriodSeconds (10s) in RuleExecutor + // - 1st archive at ~10s after recording starts (initial delay) + // - 2nd archive at ~20s after recording starts + // - 3rd archive at ~30s after recording starts + // - 4th archive at ~40s after recording starts (triggers deletion of 1st, then creates 4th) + + // Wait for first 3 archives to be created + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(15)); + + // Wait for deletion that occurs with 4th archive job + webSocketClient.expectNotification("ArchivedRecordingDeleted", Duration.ofSeconds(15)); + + // Wait for the 4th archive to be created (happens after deletion in same job) + webSocketClient.expectNotification("ArchivedRecordingCreated", Duration.ofSeconds(5)); + + // Stop further background jobs before checking results + given().log() + .all() + // do not clean, or else Cryostat will archive the recording on stop and + // create an additional copy + .queryParam("clean", false) + .pathParam("ruleName", RULE_NAME) + .delete("/api/v4/rules/{ruleName}") + .then() + .log() + .all() + .and() + .assertThat() + .statusCode(204) + .body(Matchers.emptyOrNullString()); + + webSocketClient.expectNotification("RuleDeleted", Duration.ofSeconds(5)); given().log() .all()