Skip to content

RATIS-2546. Add input stream to DataStreamApi for read operations in Client#1481

Merged
szetszwo merged 20 commits into
apache:masterfrom
peterxcli:RATIS-2546-stream-read-client
Jun 23, 2026
Merged

RATIS-2546. Add input stream to DataStreamApi for read operations in Client#1481
szetszwo merged 20 commits into
apache:masterfrom
peterxcli:RATIS-2546-stream-read-client

Conversation

@peterxcli

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

  • Added a new DataStreamInput interface and its implementation DataStreamInputImpl for asynchronous, zero-copy read-only data streaming, including the readAsync() method for consuming replies and proper resource release on close.
  • Extended the DataStreamApi interface and its implementation to provide streamReadOnly() methods for creating read-only streams.
  • Introduced the new DataStreamReplyByteBuf class to support replies backed by Netty ByteBuf.
  • Added a new overload of streamAsync() in the DataStreamClientRpc interface that takes a reply consumer, supporting multiple replies per request.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2546

How was this patch tested?

(Please explain how this patch was tested. Ex: unit tests, manual tests)
(If this patch involves UI changes, please attach a screen-shot; otherwise, remove this)

Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli

Copy link
Copy Markdown
Member Author

cc @szetszwo

@peterxcli

Copy link
Copy Markdown
Member Author

The no-MD5 sequential reads comparison:

Key size Buffer ReadBlock stream Ratis data stream Ratis / ReadBlock bandwidth
256 MiB 32 MiB 333.23 MB/s 448.04 MB/s 1.34x bandwidth
256 MiB 8 MiB 651.57 MB/s 753.94 MB/s 1.16x bandwidth
256 MiB 1 MiB 346.72 MB/s 726.91 MB/s 2.10x bandwidth
256 MiB 4 KiB 303.35 MB/s 777.43 MB/s 2.56x bandwidth
500 MiB 32 MiB 731.13 MB/s 445.67 MB/s 0.61x bandwidth
500 MiB 8 MiB 587.45 MB/s 1012.59 MB/s 1.72x bandwidth
500 MiB 1 MiB 520.76 MB/s 598.70 MB/s 1.15x bandwidth
500 MiB 4 KiB 373.36 MB/s 846.50 MB/s 2.27x bandwidth
1 GiB 32 MiB 869.43 MB/s 693.04 MB/s 0.80x bandwidth
1 GiB 8 MiB 508.69 MB/s 474.92 MB/s 0.93x bandwidth
1 GiB 1 MiB 432.13 MB/s 784.25 MB/s 1.81x bandwidth
1 GiB 4 KiB 378.85 MB/s 1087.56 MB/s 2.87x bandwidth

The no-MD5 random reads comparison:

Key size Random read size Direct stream Ratis data stream Bandwidth ratio IOPS ratio Direct elapsed Ratis elapsed
256 MiB 1 MiB 28.92 MB/s 103.03 MB/s 3.56x bandwidth 3.56x IOPS 1.106 s 0.311 s
256 MiB 4 KiB 0.12 MB/s 0.38 MB/s 3.19x bandwidth 3.19x IOPS 1.059 s 0.332 s
500 MiB 1 MiB 44.18 MB/s 255.86 MB/s 5.79x bandwidth 5.79x IOPS 0.724 s 0.125 s
500 MiB 4 KiB 0.14 MB/s 0.59 MB/s 4.06x bandwidth 4.06x IOPS 0.865 s 0.213 s
1 GiB 1 MiB 33.08 MB/s 193.64 MB/s 5.85x bandwidth 5.85x IOPS 0.967 s 0.165 s
1 GiB 4 KiB 0.10 MB/s 0.87 MB/s 8.34x bandwidth 8.34x IOPS 1.197 s 0.144 s

I'm looking into why larger buffer would cause throughput to degrade.

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks a lot for working on this!

The PR is quite big. Let's separate the DataStreamReply change first. Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082807/1481_review_DataStreamReply.patch (updated the link in the next comment.)

Comment thread ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java Outdated
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli marked this pull request as draft June 17, 2026 17:18

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks for updating this!

Let's have one more simple change to add an executor and terminalReply (RATIS-2564). Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082846/1481_review3_refactoring.patch

Comment thread ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java Outdated
Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java Outdated
Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/server/ReadStreamManagement.java Outdated
Signed-off-by: peterxcli <peterxcli@gmail.com>

/** Async call to send a request and receive multiple replies for the request. */
default CompletableFuture<DataStreamReply> streamAsync(
DataStreamRequest request, Consumer<DataStreamReply> replyConsumer) {

@szetszwo szetszwo Jun 18, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , Let's borrow the idea from gRPC StreamObserver and add a similar interface. It is more flexible for future changes.

//ratis-common
package org.apache.ratis.datastream;

/** An interface similar to gRPC {@link org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver}. */
public interface DataStreamObserver<V> {
  void onNext(V value);

  // see if onError(Throwable) and onCompleted() are useful.  Or we may add them later.
}

Then our interface will be similar to gRPC service https://github.com/apache/ratis/blob/master/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java#L174-L181

//DataStreamClientRpc
  /** Async call to send a request and receive multiple replies for the request. */
  default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request,
      DataStreamObserver<DataStreamReplyByteBuf> replyHandler) {
    throw new UnsupportedOperationException(getClass() + " does not support "
        + JavaUtils.getCurrentStackTraceElement().getMethodName());
  }

Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli marked this pull request as ready for review June 18, 2026 20:05
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli

peterxcli commented Jun 18, 2026

Copy link
Copy Markdown
Member Author

@szetszwo thanks for the grpc observer suggestion, just added a initial version of it.

I also add onError and onComplete interface method for the observer, because we can decide which hook to call base on the terminal reply.

Please take a look, Thanks!

Comment thread ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java Outdated
Comment thread ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java Outdated
Comment thread ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java Outdated
Co-Authored-By: amaliujia <amaliujia@apache.org>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli force-pushed the RATIS-2546-stream-read-client branch from aa3ab09 to 7b15a13 Compare June 19, 2026 14:25
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli requested review from amaliujia and szetszwo June 19, 2026 21:33
Signed-off-by: peterxcli <peterxcli@gmail.com>

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks for the update!

In DataStreamInput, readAsync() should return a ReferenceCountedObject so the user can retain/release the buffer.

Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082885/1481_review4.patch

Comment thread ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamObserver.java Outdated
Comment thread ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamInput.java Outdated
Comment thread ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java Outdated
Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java Outdated
Comment thread ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java Outdated
Co-Authored-By: szetszwo <szetszwo@apache.org>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli requested a review from szetszwo June 20, 2026 17:57

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli , thanks for the update!

Comment on lines +71 to +78
reply.retain();
for (CompletableFuture<ReferenceCountedObject<DataStreamReply>> pending;
(pending = pendingReads.poll()) != null; ) {
if (pending.complete(reply)) {
return;
}
}
replies.add(reply);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all entries in pendingReads must not be completed, remove the loop.

    reply.retain();
    final CompletableFuture<ReferenceCountedObject<DataStreamReply>> pending = pendingReads.poll();
    if (pending != null) {
      final boolean completed = pending.complete(reply);
      Preconditions.assertTrue(completed);
      return;
    }
    replies.add(reply);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the onNext API caller cancel the one of the future, the other future might never get the reply

Related testcase added in this patch:

@Test
public void testReceiveSkipsCancelledPendingRead() throws Exception {
final RaftPeer follower = newPeer("follower");
final RecordingDataStreamClientRpc dataStreamClientRpc = new RecordingDataStreamClientRpc();
try (DataStreamClient dataStreamClient = newDataStreamClient(follower, dataStreamClientRpc);
DataStreamInput input = dataStreamClient.streamReadOnly(ByteBuffer.wrap(new byte[] {1}))) {
final CompletableFuture<ReferenceCountedObject<DataStreamReply>> cancelled = input.readAsync();
final CompletableFuture<ReferenceCountedObject<DataStreamReply>> active = input.readAsync();
cancelled.cancel(false);
Assertions.assertEquals(follower.getId(), dataStreamClientRpc.getRequest().getServerId());
final DataStreamReplyByteBuf reply = DataStreamReplyByteBuf.newBuilder()
.setClientId(ClientId.randomId())
.setType(Type.STREAM_DATA)
.setStreamId(1)
.setStreamOffset(0)
.setBuf(Unpooled.EMPTY_BUFFER)
.setSuccess(true)
.build();
dataStreamClientRpc.receive(reply);
Assertions.assertTrue(active.isDone());
final ReferenceCountedObject<DataStreamReply> received = active.getNow(null);
Assertions.assertNotNull(received);
final DataStreamReplyByteBuf data = Assertions.assertInstanceOf(DataStreamReplyByteBuf.class, received.get());
Assertions.assertEquals(Type.STREAM_DATA, data.getType());
received.release();
}
}

And since you assertion for that case, should we remove the above testcase and add document and tell user DONT CANCEL the return future?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the testReceiveSkipsCancelledPendingRead first, if you think we should change the api behaviour, feel free to ask me to add it back. Thanks!

3c8c478

Comment thread ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java Outdated
Comment on lines +370 to +375
.setReleaseMethod(r -> {
if (r != null) {
Preconditions.assertSame(reply, r, "reply");
reply.release();
}
}).build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change Preconditions.assertSame to return DataStreamReplyByteBuf:

        final ReferenceCountedObject<DataStreamReply> ref = ReferenceCountedObject.<DataStreamReply>newBuilder()
            .setValue((DataStreamReplyByteBuf) msg)
            .setReleaseMethod(r -> Preconditions.assertSame(reply, r, "reply").release())
            .build();
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -88,9 +88,10 @@ public interface Preconditions {
         () -> name + ": expected == " + expected + " but computed == " + computed);
   }
 
-  static void assertSame(Object expected, Object computed, String name) {
+  static <T> T assertSame(T expected, Object computed, String name) {
     assertTrue(expected == computed,
         () -> name + ": expected == " + expected + " but computed == " + computed);
+    return expected;
   } 

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this actually is not working. Need to keep check not null first.

   .setReleaseMethod(r -> {
              if (r != null) {
                Preconditions.assertSame(reply, r, "reply");
                reply.release();
              }
            }).build();

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I plan to add as static RC builder function for DataStreamReply:

public static ReferenceCountedObject<DataStreamReply> asReferenceCounted(DataStreamReplyByteBuf reply) {
  Objects.requireNonNull(reply, "reply == null");
  return ReferenceCountedObject.<DataStreamReply>newBuilder()
      .setValue(reply)
      .setReleaseMethod(r -> {
        if (r != null) {
          Preconditions.assertSame(reply, r, "reply").release();
        }
      })
      .build();
}

so prod and test can shared the same RC-DataStreamReply constructor code

Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java Outdated
Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java Outdated
Comment thread ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java Outdated
@peterxcli

Copy link
Copy Markdown
Member Author

@szetszwo found a new class AsyncQueue in the review patch, curious is that accidentally included or if you want to use it to refactor sth?

Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
@peterxcli peterxcli requested a review from szetszwo June 23, 2026 08:44
@peterxcli

Copy link
Copy Markdown
Member Author

@szetszwo Thanks for the review again! If you have any idea on further splitting for this PR, please feel free to ask me to do that!

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo szetszwo merged commit abfe70f into apache:master Jun 23, 2026
18 checks passed
@szetszwo

Copy link
Copy Markdown
Contributor

@peterxcli , thanks a lot for working hard on this!

@amaliujia , thanks for reviewing this!

@peterxcli peterxcli deleted the RATIS-2546-stream-read-client branch June 23, 2026 18:01
@peterxcli

Copy link
Copy Markdown
Member Author

Thanks @szetszwo for continuous review and refactor suggestion!
Thanks @amaliujia for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants