feat(sdk): implement vsr header framing#3254
Conversation
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (22.25%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #3254 +/- ##
============================================
- Coverage 73.78% 73.44% -0.35%
Complexity 943 943
============================================
Files 1200 1200
Lines 109116 109661 +545
Branches 86004 86567 +563
============================================
+ Hits 80516 80543 +27
- Misses 25874 26353 +479
- Partials 2726 2765 +39
🚀 New features to boost your workflow:
|
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
notes on items that did not fit a line range:
- the
IggyNamespace::newmasking issue (namespace.rs:108-113) is upstream and not touched by this PR, but the SDK now feeds raw wire u32s straight into it (vsr.rs:187), so the silent-collision blast radius lands inside this PR. would be good to range-check at the SDK boundary even if the namespace fix lands separately. send_raw_with_responseretry at tcp_client.rs:136-148 is pre-existing, but the newreset_vsr_session()calls at lines 548/567 turn the retry path into a silent recovery failure whenauto_login=Disabled: reset clears the bound session,connect()skips re-login, nextsend_rawencodes with session=None and returnsUnauthenticatedfor what looks like a transient transport error. consider gating retry onsession.is_bound()under vsr, or forcingauto_loginon under vsr.#[cfg(feature = "vsr")]is interleaved per-line acrosscore/sdk/src/{tcp,quic,websocket}/*.rs(8 separate gates in tcp_client.rs alone). consider extractingsend_legacy/send_vsrhelpers per transport, or splitting the vsr framing into its own module, to reduce mis-edit hazard.
| } | ||
| _ => { | ||
| let operation = | ||
| Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?; |
There was a problem hiding this comment.
Operation::from_command_code returns None for every read-only command (PING, GET_STATS, GET_STREAM, POLL_MESSAGES, etc. - see the read_only_commands_have_no_operation test in core/binary_protocol/src/consensus/operation.rs). this branch then returns FeatureUnavailable for every read. any vsr-mode integration test that does client.ping(), get_*, or poll_messages will fail closed - hello_world only logs in so it masks the bug today. the read path needs to bypass the op-table lookup (dedicated Operation::Read/Operation::NonReplicated/Operation::Query or similar, with session + request_id only).
| topic_id: &WireIdentifier, | ||
| partition_id: Option<u32>, | ||
| ) -> Result<u64, IggyError> { | ||
| let stream_id = stream_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; |
There was a problem hiding this comment.
namespace_from_partition calls WireIdentifier::as_u32().ok_or(FeatureUnavailable). the String variant always returns None, so send_messages, store_consumer_offset, delete_consumer_offset, *_2, and delete_segments all fail closed whenever stream/topic is supplied by name (the idiomatic SDK usage). either resolve name → numeric id via metadata round-trip before computing the namespace, or surface a clear numeric-only restriction with a proper error variant.
| let stream_id = stream_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; | ||
| let topic_id = topic_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; | ||
| let partition_id = partition_id.ok_or(IggyError::FeatureUnavailable)?; | ||
| Ok(IggyNamespace::new(stream_id as usize, topic_id as usize, partition_id as usize).inner()) |
There was a problem hiding this comment.
partition_id as usize is unchecked at the SDK boundary, and IggyNamespace::new then does (partition as u64) & PARTITION_MASK (namespace.rs:108-113) with no overflow check on the masked field. any wire partition_id >= 2^20 wraps and two distinct partitions collapse to the same namespace, routing to the wrong shard. validate at the SDK boundary, and ideally make IggyNamespace::new fallible so the limits (4096/4096/1M) are enforced by the type rather than masked away.
| let header = RequestHeader { | ||
| command: Command2::Request, | ||
| operation, | ||
| size: total_size as u32, |
There was a problem hiding this comment.
size: total_size as u32 silently truncates on payloads > 4 GiB. checked_add two lines above only protects the usize math; nothing protects the u32 narrowing. swap for u32::try_from(total_size).map_err(...)
|
|
||
| let mut request = BytesMut::with_capacity(total_size); | ||
| request.put_slice(bytemuck::bytes_of(&header)); | ||
| request.put_slice(payload); |
There was a problem hiding this comment.
fresh BytesMut + two put_slice calls per request memcpy the full header + payload on the hot send path. legacy framing wrote header/code/payload into a caller-owned buffer. consider vectored write of header + payload, or reserve a header slot in the caller buffer so the payload doesn't need to be copied.
|
@hubcio made a commit that addresses issues from those comments |
|
/review |
|
@numinnex my review had 19 comments; youaddressed only 4 (vsr.rs:61 u32::try_from, vsr.rs:209-211, validate_namespace_field, session_manager::add_connection removed, integration-vsr crate removed). 15 issues are still present. if you dont want to fix them now please at least add TODO in code. |
|
/review |
Adds
Vsrfeature flag in thesdkcrate that introduces the consensus header required by the requests.Additionally adds one
hello_worldtest that tests thelogincommand using the new integration test suite adapted toserver-ngbinary.