feat: implement device registration API with kafka event publishing#44
feat: implement device registration API with kafka event publishing#44chethann007 wants to merge 9 commits into
Conversation
…events Update config key from kafka_topics_deviceprofile to kafka_topics_device_events and topic pattern from local.events.deviceprofile to local.events.device for consistency with other event-based topics like certificate_instruction.
sntiwari1
left a comment
There was a problem hiding this comment.
Code Review Summary — feat: implement device registration API with kafka event publishing
Overall Assessment
Solid first cut of the device registration feature — the layered architecture (Controller → Actor → Service → DAO) follows the project's conventions and the async Pekko wiring looks correct. The main concerns are a security issue with IP header trust, a client-controlled timestamp that undermines idempotency guarantees, and a few smaller correctness/safety gaps. No major design problems.
Critical / High
| # | File | Issue |
|---|---|---|
| 1 | DeviceController.java |
resolveIp blindly trusts X-Real-IP/X-Forwarded-For — these headers can be injected by clients |
| 2 | DeviceRegisterServiceImpl.java |
first_access is taken from the request body — clients can set an arbitrary past/future timestamp |
| 3 | — | No DB migration script for lern_device_profile table — the feature can't deploy without it |
Medium
| # | File | Issue |
|---|---|---|
| 4 | DeviceRegisterActor.java |
TelemetryEnvKey.USER is semantically wrong for a device operation |
| 5 | DeviceRegisterActor.java |
Switch case uses hard-coded string "registerDevice" instead of ActorOperations.REGISTER_DEVICE.getValue() |
| 6 | DeviceRegisterActor.java |
deviceRegisterService field is not final |
| 7 | DeviceRegisterRequestValidator.java |
(String) req.get("deviceId") will throw ClassCastException if payload sends a non-string value |
| 8 | DeviceProfileDaoImpl.java |
stmt.setObject(11, userDeclaredOn) — JDBC driver handling of a boxed Long mapped to to_timestamp() is driver-specific; use explicit null handling |
Low / Nit
| # | File | Issue |
|---|---|---|
| 9 | DeviceProfileDaoImpl.java |
Log message concatenates user-supplied device_id directly — log injection |
| 10 | DeviceRegisterServiceImpl.java |
Same log injection pattern |
| 11 | DeviceController.java |
new DeviceRegisterRequestValidator() is instantiated inline; all other validators in the project are injected |
| 12 | DeviceRegisterRequestValidatorTest.java |
Missing whitespace-only device-id test (e.g. " ") — StringUtils.isBlank handles it but it's worth an explicit assertion |
| 13 | application.conf |
Pekko dispatcher reference pekko.actor.rr-dispatcher is likely wrong — the alias rr-dispatcher (no prefix) is what's defined elsewhere; double-check pool config |
What's missing
- DB migration:
lern_device_profileDDL must ship with this PR (or a migration ticket must be tracked). ip_addris extracted but never persisted — document the decision (privacy? future column?) in a comment.- No integration/actor-level tests —
DeviceRegisterActorandDeviceRegisterServiceImplhave no unit tests. - Kafka topic default is
local.events.deviceprofile; make sure thedevelop/staging overrides are in place before merge.
Inline comments below point at specific lines.
| * @return Client IP or empty string | ||
| */ | ||
| private String resolveIp(Http.Request req) { | ||
| return req.header("X-Real-IP").orElse(req.header("X-Forwarded-For").orElse("")); |
There was a problem hiding this comment.
Security — IP header spoofing
X-Real-IP and X-Forwarded-For are HTTP headers that any client can set. If traffic ever reaches this service without going through a trusted proxy that overwrites these headers, an attacker can claim any IP address.
The safe pattern depends on your infra:
- If every request comes through a load balancer/proxy that always overwrites
X-Real-IP, this is fine — but it should be documented and the dependency on that LB config must be explicit. - Otherwise, fall back to
req.remoteAddress()(the actual TCP remote address) which cannot be spoofed.
Suggestion: add a comment naming the LB/proxy that sets this header, or use req.remoteAddress() as the trusted source.
| request.getRequest().put("ip_addr", resolveIp(httpRequest)); | ||
| request.getRequest().put("user_agent", httpRequest.header("User-Agent").orElse("")); | ||
| // Validate | ||
| new DeviceRegisterRequestValidator().validate(request); |
There was a problem hiding this comment.
Nit — validator not injected
Every other controller in this project injects its validator (e.g. via @Inject on a field or constructor). Instantiating new DeviceRegisterRequestValidator() inline makes the controller harder to unit-test and breaks the DI pattern.
Consider:
@Inject private DeviceRegisterRequestValidator validator;
// ...
validator.validate(request);| * | ||
| * @see DeviceRegisterService | ||
| * @see DeviceRegisterServiceImpl | ||
| */ |
There was a problem hiding this comment.
Medium — field should be final
deviceRegisterService is set once in each constructor and never reassigned. Marking it final prevents accidental mutation and clearly documents the intent:
private final DeviceRegisterService deviceRegisterService;| @Override | ||
| public void onReceive(Request request) throws Throwable { | ||
| Util.initializeContext(request, TelemetryEnvKey.USER); | ||
| String operation = request.getOperation(); |
There was a problem hiding this comment.
Medium — wrong telemetry env key
TelemetryEnvKey.USER is the context used for user operations. This is a device operation — if a TelemetryEnvKey.DEVICE (or similar) key exists in the enum, use it. Using the wrong key causes telemetry events to be misclassified in the analytics pipeline.
|
|
||
| switch (operation) { | ||
| case "registerDevice": | ||
| registerDevice(request); |
There was a problem hiding this comment.
Medium — hard-coded operation string
Other actors in this project route on ActorOperations.SOME_OP.getValue() rather than raw string literals. Hard-coding "registerDevice" here creates a silent divergence if the enum value is ever changed:
case ActorOperations.REGISTER_DEVICE.getValue():This also makes it easier to grep for all usages of an operation across the codebase.
| stmt.setString(9, (String) profile.get("user_declared_state")); | ||
| stmt.setString(10, (String) profile.get("user_declared_district")); | ||
| stmt.setObject(11, userDeclaredOn); // nullable Long | ||
|
|
There was a problem hiding this comment.
Medium — fragile null handling with setObject
stmt.setObject(11, userDeclaredOn) relies on the JDBC driver to map a boxed Long (or null) to the SQL parameter used inside to_timestamp(? / 1000.0). Some drivers handle this differently.
Be explicit:
if (userDeclaredOn != null) {
stmt.setLong(11, userDeclaredOn);
} else {
stmt.setNull(11, java.sql.Types.BIGINT);
}|
|
||
| stmt.executeUpdate(); | ||
| log.info("DeviceProfileDaoImpl: UPSERT successful for deviceId=" + profile.get("device_id")); | ||
|
|
There was a problem hiding this comment.
Low — log injection risk
device_id comes from user input. Injecting it directly into the log message allows a malicious value like "\nINFO: admin login successful" to pollute log aggregators and confuse alerting.
Escape or sanitise before logging, or use a structured logging approach:
log.info("DeviceProfileDaoImpl: UPSERT successful", Map.of("deviceId", profile.get("device_id")));(Same pattern applies to the catch block on the next line.)
| String ipAddr = (String) req.get("ip_addr"); | ||
| Long firstAccess = toLong(req.get("first_access")); | ||
| Object dspec = req.get("dspec"); | ||
| Object userDeclaredLocation = req.get("userDeclaredLocation"); |
There was a problem hiding this comment.
High — client-controlled first_access timestamp
first_access is taken directly from the request body. This means any client can:
- Claim their device was registered years ago (bypassing any age-based logic).
- Overwrite the real first-access timestamp on a re-registration.
first_access should be a server-side value: the DB upsert sets it only on INSERT (which the SQL already does — first_access is not in the DO UPDATE clause). There's no need to accept it from the client at all. Remove toLong(req.get("first_access")) and always pass null/now from the server.
| log.info("DeviceRegisterServiceImpl: Device registered - deviceId=" + deviceId); | ||
|
|
||
| // 3. Publish Kafka event (best-effort, non-blocking failure) | ||
| publishKafkaEvent(profile, now); |
There was a problem hiding this comment.
Low — log injection
Same as DeviceProfileDaoImpl: deviceId is user-supplied. Avoid direct string concatenation in log messages for fields that originate from external input.
| "/ssu_user_create_actor/*" { dispatcher = pekko.actor.most-used-one-dispatcher } | ||
| "/sso_user_create_actor" { router = smallest-mailbox-pool, nr-of-instances = 15, dispatcher = most-used-one-dispatcher } | ||
| "/sso_user_create_actor/*" { dispatcher = pekko.actor.most-used-one-dispatcher } | ||
| # --- DEVICE MANAGEMENT ACTORS --- |
There was a problem hiding this comment.
Nit — verify dispatcher reference
The pool entry uses dispatcher = rr-dispatcher, but the child dispatcher line uses pekko.actor.rr-dispatcher (with the pekko.actor. prefix). Compare with the other actor blocks immediately above where child dispatchers are referenced as pekko.actor.most-used-one-dispatcher. The rr-dispatcher alias should resolve to a defined dispatcher — double-check this doesn't silently fall back to the default dispatcher in production.
Description
Implement complete device registration functionality across lern-service and userorg-service. Includes REST API endpoint, request validation, async processing via DeviceRegisterActor, YugabyteSQL database persistence, and kafka event publishing for downstream consumers.
Fixes # (issue reference if applicable)
Type of Change
Microservice(s) Affected
userorg-servicelern-service(Unified/Common)How Has This Been Tested?
mvn testfor affected modules.Checklist:
Implementation Summary
Core Components:
DeviceRegisterActor: Async message handling for device registrationDeviceRegisterService: Business logic orchestrationDeviceProfileDao: YugabyteSQL persistence with upsert capabilityDeviceController: REST API endpointDeviceRegisterRequestValidator: Request validationFeatures: