Skip to content

Feat:Health check adapter#4330

Open
Nguyen-Bang wants to merge 10 commits into
apache:devfrom
Nguyen-Bang:health-check-adapter
Open

Feat:Health check adapter#4330
Nguyen-Bang wants to merge 10 commits into
apache:devfrom
Nguyen-Bang:health-check-adapter

Conversation

@Nguyen-Bang
Copy link
Copy Markdown
Contributor

@Nguyen-Bang Nguyen-Bang commented Mar 31, 2026

Adapter Health Check System

How to use

  • on start up of the adapter the status light will first be grey until the first health check (15 sec)
  • click on status light in adapter overview
  • inside are 2 lights dedicated for streampipes backend (previous implementation of status light) and data source (new)
  • in the top right of the window is a manual check button (for immediate check) (see 1st picture)
  • automatic checks are done, when healthy every 60 sec, when unhealthy the time until check can be seen at the bottom of the window (see 2nd picture)
  • during a check only inside the health window the lights turn grey to give visual feedback that a check is currently runnign (see 3rd picture). Outside in the adapter overview the status doesn't turn grey to not confuse the user.
  • for unsupported adapter types the window will display it (see 4th picture)
Screenshot from 2026-03-31 02-30-26 Screenshot from 2026-03-31 02-31-32 image image

Overview

The Adapter Health Check system monitors the health of running adapters by periodically checking both the backend service status and the data source connectivity.

Supported Adapters

Adapter Health Check Type What's Verified
Kafka Broker + Topic Cluster connectivity, topic existence
MQTT Broker + Topic Broker connectivity, topic subscription
OPC-UA Server + Session Server state node read, subscription status (for subscription mode), auto-reconnect on failure

Health Check Intervals

Scenario Interval
Initial check after adapter start 15 seconds
Successful check (healthy) 60 seconds
After failure Exponential backoff: 60s × 2^failures
Maximum interval 24 hours (86,400,000 ms)

Architecture

Core Files

File Description
streampipes-extensions-api/.../IDataSourceHealthCheck.java Interface for adapters to implement
streampipes-extensions-api/.../DataSourceHealthCheckResult.java Health check result record
streampipes-extensions-management/.../AdapterHealthCheckManager.java Central manager (singleton enum)
streampipes-rest-extensions/.../AdapterHealthResource.java REST API endpoint
streampipes-model/.../AdapterHealthStatus.java Status model
streampipes-model/.../HealthCheckStatus.java Status enum (HEALTHY, UNHEALTHY, UNKNOWN)

UI Files

File Description
ui/.../adapter-health.service.ts HTTP service for health API
ui/.../adapter-status-light/ Status indicator component
ui/.../adapter-health/details-dialog/ Detailed health dialog
ui/.../adapter-health/status-section/ Reusable backend/data-source status section
ui/.../adapter-health/error-output/ Error output and stack trace section

Adding Support for New Adapters

  1. Implement IDataSourceHealthCheck interface in your adapter class:
public class MyAdapter implements StreamPipesAdapter, IDataSourceHealthCheck {
  
  @Override
  public DataSourceHealthCheckResult checkDataSourceHealth() {
    try {
      // Verify connectivity to your data source
      if (isConnected()) {
        return DataSourceHealthCheckResult.healthy("Connection OK");
      }
      return DataSourceHealthCheckResult.unhealthy("Not connected");
    } catch (Exception e) {
      return DataSourceHealthCheckResult.unhealthyWithException("Check failed", e);
    }
  }
}
  1. The AdapterHealthCheckManager automatically detects adapters implementing IDataSourceHealthCheck when they are registered.

REST API

Endpoint Method Description
/api/v1/adapter-health GET Get all adapter health statuses
/api/v1/adapter-health/{adapterId} GET Get specific adapter health
/api/v1/adapter-health/{adapterId}/trigger POST Force immediate health check

Self-Healing (OPC-UA)

I had the problem that if the adapter fails once, it fails forever (maybe just with my opcua simulator), that means even if the opcua server goes back online the adapter doesn't automatically resubscribe. to fix that the health check is now imitating a restart of the adapter

The OPC-UA adapter includes auto-reconnection logic:

  • When a health check fails, it attempts to reconnect
  • Clears cached state and re-initializes the connection
  • Reports success/failure of reconnection attempt

Copilot AI review requested due to automatic review settings March 31, 2026 00:38
@github-actions github-actions Bot added dependencies Pull requests that update a dependency file java Pull requests that update Java code ui Anything that affects the UI backend Everything that is related to the StreamPipes backend labels Mar 31, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces an adapter health-check system that periodically evaluates (1) extension-service/backend availability and (2) adapter data-source connectivity, surfaces the aggregated status via new REST endpoints, and adds UI affordances (status light + details dialog + manual trigger) to inspect and trigger checks.

Changes:

  • Added backend + extensions REST resources and shared model types for adapter health status reporting and triggering.
  • Implemented a central health-check scheduler/manager in extensions-management and added data-source health checks for Kafka, MQTT, and OPC-UA adapters.
  • Added Angular UI integration: polling health status, passing it into the status light, and a new details dialog with countdown + manual trigger.

Reviewed changes

Copilot reviewed 23 out of 24 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
ui/src/app/connect/services/adapter-health.service.ts Angular HTTP wrapper for adapter-health list + trigger endpoints
ui/src/app/connect/model/adapter-health-status.model.ts UI model/types for adapter health status + enum
ui/src/app/connect/components/existing-adapters/existing-adapters.component.ts Adds periodic polling and stores adapter health statuses for the overview table
ui/src/app/connect/components/existing-adapters/existing-adapters.component.html Wires health status into the status light component
ui/src/app/connect/components/existing-adapters/adapter-status-light/adapter-status-light.component.ts Enhances status light to reflect health state and open health details dialog
ui/src/app/connect/components/existing-adapters/adapter-status-light/adapter-status-light.component.html Makes status indicator clickable with tooltip and dynamic class binding
ui/src/app/connect/components/existing-adapters/adapter-status-light/adapter-status-light.component.scss Adds hover/transition styling for clickable status light
ui/src/app/connect/components/existing-adapters/adapter-health-details-dialog/adapter-health-details-dialog.component.ts New dialog to show backend/data-source health, countdown, and trigger action
ui/src/app/connect/components/existing-adapters/adapter-health-details-dialog/adapter-health-details-dialog.component.html Dialog UI rendering of health indicators, details, and backoff info
ui/src/app/connect/components/existing-adapters/adapter-health-details-dialog/adapter-health-details-dialog.component.scss Styling for dialog layout and health status presentation
ui/package-lock.json Dependency lockfile updates (eslint packages and transitive bumps)
streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AdapterHealthResource.java New backend aggregator endpoint (/api/v2) to query/trigger extension adapter-health endpoints
streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/monitoring/AdapterHealthResource.java New extensions endpoint (/api/v1) exposing manager health statuses + trigger
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/HealthCheckStatus.java New shared enum for health check statuses
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterHealthStatus.java New shared model representing adapter health and scheduling metadata
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java Adds OPC-UA data-source health check with reconnect/self-heal attempt
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttHealthChecker.java New MQTT broker/topic connectivity checker used by health checks
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java Refactors MQTT client creation to allow reuse for health-checker vs runtime client
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java Implements IDataSourceHealthCheck for MQTT adapters
streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java Implements IDataSourceHealthCheck for Kafka adapters (broker + topic existence)
streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/AdapterHealthCheckManager.java New scheduler/manager maintaining adapter health statuses and exponential backoff
streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/init/RunningAdapterInstances.java Registers/unregisters adapters with the health-check manager
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/IDataSourceHealthCheck.java New SPI interface for adapters to expose data-source health checks
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/DataSourceHealthCheckResult.java New result record for health checks (status/message/details/exception)
Files not reviewed (1)
  • ui/package-lock.json: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +134 to +139
if (backendHealth != HealthCheckStatus.HEALTHY) {
overallStatus = HealthCheckStatus.UNHEALTHY;
} else if (!dataSourceHealthSupported) {
overallStatus = backendHealth;
} else {
overallStatus = dataSourceHealth;
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateOverallStatus treats any non-HEALTHY backend status (including UNKNOWN) as UNHEALTHY, which makes the overall status misleading when the backend state is simply unknown. Consider explicitly propagating UNKNOWN (e.g., if backendHealth is UNKNOWN then overallStatus should be UNKNOWN) and only mark UNHEALTHY when backendHealth is UNHEALTHY.

Suggested change
if (backendHealth != HealthCheckStatus.HEALTHY) {
overallStatus = HealthCheckStatus.UNHEALTHY;
} else if (!dataSourceHealthSupported) {
overallStatus = backendHealth;
} else {
overallStatus = dataSourceHealth;
if (backendHealth == HealthCheckStatus.UNKNOWN) {
// Backend health is unknown, so the overall status should also be unknown
overallStatus = HealthCheckStatus.UNKNOWN;
} else if (backendHealth == HealthCheckStatus.UNHEALTHY) {
// A clearly unhealthy backend makes the overall status unhealthy
overallStatus = HealthCheckStatus.UNHEALTHY;
} else {
// Backend is healthy here
if (!dataSourceHealthSupported) {
// No data source health information; overall follows backend health (HEALTHY)
overallStatus = backendHealth;
} else if (dataSourceHealth == HealthCheckStatus.UNKNOWN) {
// Data source health is unknown, so reflect that in the overall status
overallStatus = HealthCheckStatus.UNKNOWN;
} else {
// Use the concrete data source health status (HEALTHY or UNHEALTHY)
overallStatus = dataSourceHealth;
}

Copilot uses AI. Check for mistakes.
return adapter[column];
};

this.healthPoll$ = interval(5000).subscribe(() => {
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI polls /adapter-health every 5s (interval(5000)), even though successful checks run every ~60s and failures back off. This creates unnecessary backend traffic and can become expensive with many users/adapters. Consider polling less frequently (e.g., 30–60s), polling only when the health dialog is open, or switching to a push/refresh-on-demand approach.

Suggested change
this.healthPoll$ = interval(5000).subscribe(() => {
this.healthPoll$ = interval(60000).subscribe(() => {

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +57
@RestController
@RequestMapping("/api/v2/adapter-health")
public class AdapterHealthResource extends AbstractAuthGuardedRestResource {
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description lists the public REST endpoint as /api/v1/adapter-health, but the UI calls /api/v2/adapter-health (and this controller is mapped to /api/v2/adapter-health). If v1 is intended only for the extensions service and v2 for the backend aggregator, consider updating the documentation to avoid confusion for API consumers.

Copilot uses AI. Check for mistakes.
Comment on lines +56 to +60
if (!connectLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
return DataSourceHealthCheckResult.unhealthyWithException(
"MQTT connection timed out",
new java.util.concurrent.TimeoutException("Timed out waiting for broker response after " + TIMEOUT_SECONDS + "s")
);
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On connect timeout the method returns without disconnecting/closing the client. Consider disconnecting/closing the client before returning (or using a finally block) to avoid leaving background resources running after a timed-out connection attempt.

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +64
if (connectError.get() != null) {
return DataSourceHealthCheckResult.unhealthyWithException("MQTT connection failed", connectError.get());
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On connect failure (connectError != null) the method returns without disconnecting/closing the client. Consider cleaning up the client before returning (or using a finally block) to avoid leaking resources after failed connection attempts.

Copilot uses AI. Check for mistakes.
Comment on lines +139 to +143
Thread.currentThread().interrupt();
}
}

return List.of();
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an exception occurs while calling the extensions service, the method falls through to return List.of(). Returning an empty list discards the failure signal, and upstream code treats it as “no statuses” (and then reports adapters as backend HEALTHY via fallback). Consider propagating a distinct error result (or throwing) so the caller can mark backend health as UNHEALTHY/UNKNOWN for that endpoint.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +151
status.setBackendHealth(HealthCheckStatus.HEALTHY);
status.setBackendHealthMessage("Extension service is running");
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createFallbackStatus hardcodes backendHealth=HEALTHY / "Extension service is running". This becomes inaccurate when health status fetching fails (e.g., extensions service down or request error). Consider distinguishing “no data yet” from “extensions service unreachable” and setting backend health to UNHEALTHY/UNKNOWN with an appropriate message in the latter case.

Copilot uses AI. Check for mistakes.
Comment on lines +189 to +193
@org.springframework.web.bind.annotation.PostMapping(value = "/{adapterId}/trigger", produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize("this.hasReadAuthority()")
public ResponseEntity<Void> triggerAdapterHealthCheck(@org.springframework.web.bind.annotation.PathVariable String adapterId) {
try {
var adapter = adapterStorage.getElementById(adapterId);
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

triggerAdapterHealthCheck is only guarded by hasReadAuthority() but does not enforce per-adapter permissions (unlike other adapter endpoints). A user with general READ privilege could trigger checks for adapters they are not allowed to read; add an adapter-level permission check (and return 401/403 when unauthorized) before forwarding the trigger request.

Copilot uses AI. Check for mistakes.
status.setAdapterName(description.getName());
status.setBackendHealth(HealthCheckStatus.HEALTHY);
status.setDataSourceHealthSupported(adapter instanceof IDataSourceHealthCheck);
status.setDataSourceHealth(status.isDataSourceHealthSupported() ? HealthCheckStatus.UNKNOWN : HealthCheckStatus.UNKNOWN);
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ternary is redundant: both branches set HealthCheckStatus.UNKNOWN. This can be simplified to a single assignment (and if desired, initialize an explanatory message when data source checks are unsupported).

Suggested change
status.setDataSourceHealth(status.isDataSourceHealthSupported() ? HealthCheckStatus.UNKNOWN : HealthCheckStatus.UNKNOWN);
status.setDataSourceHealth(HealthCheckStatus.UNKNOWN);

Copilot uses AI. Check for mistakes.
Comment on lines +21 to +22
class="status-light-container"
(click)="openHealthDetails($event)"
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status light is a clickable <div> without keyboard interaction. For accessibility, use a <button> (preferred) or add role="button", tabindex="0", and key handlers for Enter/Space so the dialog can be opened via keyboard and announced correctly to assistive tech.

Suggested change
class="status-light-container"
(click)="openHealthDetails($event)"
class="status-light-container"
role="button"
tabindex="0"
(click)="openHealthDetails($event)"
(keyup.enter)="openHealthDetails($event)"
(keyup.space)="openHealthDetails($event)"

Copilot uses AI. Check for mistakes.
@Nguyen-Bang Nguyen-Bang changed the title Health check adapter Feat:Health check adapter Mar 31, 2026
@github-actions github-actions Bot removed the dependencies Pull requests that update a dependency file label Mar 31, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Hello there 👋

We noticed that it's been some time since activity occurred on your pull request 🤔. In order to keep things moving forward, we're marking this PR as stale and giving you 7 days to respond before it's automatically closed ⏰.

Please take a moment to review your pull request and make any necessary updates or changes 👨‍💻. If you need more time or have any questions, please don't hesitate to let us know 💬.

Thank you for your contributions to our project, and we look forward to hearing back from you soon 🙏.

@github-actions github-actions Bot added the stale Marks pull requests that are classified as `stale` by our bot. label Apr 22, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Hello there 👋

Unfortunately, we didn't hear back from you regarding your pull request, so we're closing it now. Don't worry, you can always reopen the PR at any time if you wish to continue working on it 🙌.

Please note that the branch associated with this pull request will not be deleted, so you can still access your changes and continue to work on them as needed 💻.

Thank you for your contributions to our project, and we hope to see you again soon!

@github-actions github-actions Bot closed this Apr 29, 2026
@dominikriemer dominikriemer reopened this May 7, 2026
@github-actions github-actions Bot removed the stale Marks pull requests that are classified as `stale` by our bot. label May 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend Everything that is related to the StreamPipes backend java Pull requests that update Java code ui Anything that affects the UI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants