Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.netty.server;

import org.apache.ratis.client.impl.OrderedAsync;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
Expand Down Expand Up @@ -47,6 +48,7 @@

import static org.apache.ratis.client.impl.ClientProtoUtils.toRaftClientRequest;
import static org.apache.ratis.client.impl.ClientProtoUtils.toRaftClientReplyProto;
import static org.apache.ratis.netty.server.DataStreamManagement.newDataStreamReplyByteBuffer;
import static org.apache.ratis.netty.server.DataStreamManagement.replyDataStreamException;

public class ReadStreamManagement {
Expand All @@ -60,24 +62,12 @@ static class ReadStream implements WritableByteChannel {
private final DataStreamReplyByteBuffer terminalReply;
private long streamOffset;

ReadStream(RaftClientRequest request, long streamId, ChannelHandlerContext ctx) {
ReadStream(RaftClientRequest request, long streamId, ChannelHandlerContext ctx, RaftClientReply terminalReply) {
this.clientId = request.getClientId();
this.streamId = streamId;
this.ctx = ctx;

final RaftClientReply reply = RaftClientReply.newBuilder()
.setRequest(request)
.setSuccess()
.build();
this.terminalReply = DataStreamReplyByteBuffer.newBuilder()
.setClientId(clientId)
.setType(Type.STREAM_HEADER)
.setStreamId(streamId)
.setStreamOffset(0)
.setBuffer(toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer())
.setSuccess(true)
.setBytesWritten(0)
.build();
this.terminalReply = newReadStreamTerminalReply(clientId, streamId, terminalReply);
}

@Override
Expand Down Expand Up @@ -186,17 +176,80 @@ private boolean processImpl(DataStreamRequestByteBuf requestBuf, ChannelHandlerC
return true;
}

final ReadStream stream = new ReadStream(request, requestBuf.getStreamId(), ctx);
requestExecutor.execute(() -> {
final CompletableFuture<RaftClientReply> readOnlyCheck;
try {
readOnlyCheck = server.submitClientRequestAsync(newDummyReadRequest(request));
} catch (IOException e) {
replyDataStreamException(server, e, request, requestBuf, ctx);
return true;
}

readOnlyCheck.whenCompleteAsync((reply, exception) -> {
if (exception != null) {
replyDataStreamException(server, exception, request, requestBuf, ctx);
return;
}

final RaftClientReply terminalReply = toReadStreamReply(request, reply);
if (!reply.isSuccess()) {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(requestBuf, terminalReply));
return;
}
Comment on lines +193 to +197

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since server returns a DUMMY_SUCCESS_REPLY, we could build the failed reply directly:

      if (!readCheckReply.isSuccess()) {
        final RaftClientReply reply = RaftClientReply.newBuilder()
            .setRequest(request)
            .setSuccess(false)
            .build();
        ctx.writeAndFlush(newDataStreamReplyByteBuffer(requestBuf, reply));
        return;
      }

      final ReadStream stream = new ReadStream(request, requestBuf.getStreamId(), ctx);


final ReadStream stream = new ReadStream(request, requestBuf.getStreamId(), ctx, terminalReply);
try {
division.getStateMachine().data().query(request.getMessage(), stream);
} catch (Throwable t) {
LOG.error("{}: Failed read-only data stream query for {}", this, request, t);
}
});
}, requestExecutor);
return true;
}

private static RaftClientRequest newDummyReadRequest(RaftClientRequest request) {
final RaftClientRequest.Builder builder = RaftClientRequest.newBuilder()
.setClientId(request.getClientId())
.setGroupId(request.getRaftGroupId())
.setCallId(request.getCallId())
.setMessage(OrderedAsync.DUMMY)
.setType(request.getType())
.setRepliedCallIds(request.getRepliedCallIds())
.setSlidingWindowEntry(request.getSlidingWindowEntry())
.setRoutingTable(request.getRoutingTable())
.setTimeoutMs(request.getTimeoutMs())
.setSpanContext(request.getSpanContext());
if (request.isToLeader()) {
builder.setLeaderId(request.getServerId());
} else {
builder.setServerId(request.getServerId());
}
return builder.build();
Comment on lines +210 to +226

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a set(RaftClientRequest request) to Builder to copy everything.

}

private static RaftClientReply toReadStreamReply(RaftClientRequest request, RaftClientReply reply) {
return RaftClientReply.newBuilder()
.setRequest(request)
.setSuccess(reply.isSuccess())
.setException(reply.getException())
.setLogIndex(reply.getLogIndex())
.setCommitInfos(reply.getCommitInfos())
.build();
}

private static DataStreamReplyByteBuffer newReadStreamTerminalReply(
ClientId clientId, long streamId, RaftClientReply reply) {
return DataStreamReplyByteBuffer.newBuilder()
.setClientId(clientId)
.setType(Type.STREAM_HEADER)
.setStreamId(streamId)
.setStreamOffset(0)
.setBuffer(toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer())
.setSuccess(reply.isSuccess())
.setBytesWritten(0)
.setCommitInfos(reply.getCommitInfos())
.build();
}

@Override
public String toString() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,11 +1113,12 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
return queryStateMachine(request);
} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){
if (reply != null) {
return reply;
}
return isDummyRead(request) ? CompletableFuture.completedFuture(newSuccessReply(request))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to learn the context, why do we need this dummy read in Ratis streaming read while we do not have it for Ratis streaming write?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to run the linearizable check; see #1469 (comment)

: queryStateMachine(request);
Comment on lines +1119 to +1120

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a DUMMY_SUCCESS_REPLY and move it to queryStateMachine:

  static final CompletableFuture<RaftClientReply> DUMMY_SUCCESS_REPLY
      = CompletableFuture.completedFuture(RaftClientReply.newBuilder().setSuccess().build());
  CompletableFuture<RaftClientReply> queryStateMachine(RaftClientRequest request) {
    if (request.getType().getRead().getDummy()) {
      return DUMMY_SUCCESS_REPLY;
    }
    return processQueryFuture(stateMachine.query(request.getMessage()), request);
  }

} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
final CompletableFuture<Long> replyFuture;
if (leader != null) {
Expand All @@ -1136,12 +1137,17 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
return replyFuture
.thenCompose(readIndex -> getState().getReadRequests().waitToAdvance(readIndex,
() -> getReadException("add", snapshotInstallationHandler.getInProgressInstallSnapshotIndex(), false)))
.thenCompose(readIndex -> queryStateMachine(request))
.thenCompose(readIndex -> isDummyRead(request)
? CompletableFuture.completedFuture(newSuccessReply(request)) : queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
}
private static boolean isDummyRead(RaftClientRequest request) {
return request.getMessage() != null && OrderedAsync.DUMMY.getContent().equals(request.getMessage().getContent());
}
Comment on lines +1147 to +1149

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use DUMMY since user applications can set any message. They may already have used it in their messages. We probably need to add a flag to the proto:

+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -302,6 +302,7 @@ message ForwardRequestTypeProto {
 message ReadRequestTypeProto {
   bool preferNonLinearizable = 1;
   bool readAfterWriteConsistent = 2;
+  bool dummy = 3;
 }


private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof StateMachineException ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.client.impl.OrderedAsync;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
Expand All @@ -28,10 +29,12 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataApi;
Expand Down Expand Up @@ -59,14 +62,17 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestDataStreamManagement {
Expand Down Expand Up @@ -96,8 +102,9 @@ public void query(Message request, WritableByteChannel stream) {
assertTrue(management.process(readOnlyRequest.request, embeddedChannel.pipeline().firstContext()));
assertEquals(0, readOnlyRequest.headerBuf.refCnt());

JavaUtils.attempt(() -> assertNotNull(streamRef.get()), 10,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "read-only stream", null);
final WritableByteChannel stream = streamRef.get();
assertNotNull(stream);
stream.write(response.asReadOnlyByteBuffer());
stream.close();

Expand All @@ -116,6 +123,99 @@ public void query(Message request, WritableByteChannel stream) {
assertTrue(ClientProtoUtils.getRaftClientReply(replies.get(1)).isSuccess());
} finally {
embeddedChannel.finishAndReleaseAll();
management.shutdown();
}
}

@Test
void readOnlyRequestWaitsForLinearizableCheck() throws Exception {
final RaftPeerId serverId = RaftPeerId.valueOf("s1");
final ClientId clientId = ClientId.randomId();
final RaftGroupId groupId = RaftGroupId.randomId();
final ByteString query = ByteString.copyFromUtf8("query");
final CompletableFuture<RaftClientReply> readOnlyCheck = new CompletableFuture<>();
final AtomicReference<RaftClientRequest> submittedReadOnlyCheck = new AtomicReference<>();
final AtomicReference<Message> messageRef = new AtomicReference<>();
final AtomicReference<WritableByteChannel> streamRef = new AtomicReference<>();

final DataApi dataApi = new DataApi() {
@Override
public void query(Message request, WritableByteChannel stream) {
messageRef.set(request);
streamRef.set(stream);
}
};
final ReadStreamManagement management = newReadStreamManagement(serverId, groupId, dataApi, request -> {
submittedReadOnlyCheck.set(request);
return readOnlyCheck;
});
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final ReadOnlyRequest readOnlyRequest = newReadOnlyRequest(clientId, serverId, groupId, 1L, query);

try {
assertTrue(management.process(readOnlyRequest.request, embeddedChannel.pipeline().firstContext()));
assertEquals(0, readOnlyRequest.headerBuf.refCnt());

final RaftClientRequest checkRequest = submittedReadOnlyCheck.get();
assertNotNull(checkRequest);
assertEquals(OrderedAsync.DUMMY.getContent(), checkRequest.getMessage().getContent());
assertNull(streamRef.get(), "state machine query should wait for the read-only check");

readOnlyCheck.complete(RaftClientReply.newBuilder().setRequest(checkRequest).setSuccess().build());
JavaUtils.attempt(() -> assertNotNull(streamRef.get()), 10,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "linearizable read-only stream", null);
assertEquals(query, messageRef.get().getContent());
} finally {
embeddedChannel.finishAndReleaseAll();
management.shutdown();
}
}

@Test
void readOnlyCheckFailureSkipsStateMachineQuery() throws Exception {
final RaftPeerId serverId = RaftPeerId.valueOf("s1");
final ClientId clientId = ClientId.randomId();
final RaftGroupId groupId = RaftGroupId.randomId();
final ByteString query = ByteString.copyFromUtf8("query");
final AtomicBoolean queryCalled = new AtomicBoolean();

final DataApi dataApi = new DataApi() {
@Override
public void query(Message request, WritableByteChannel stream) {
queryCalled.set(true);
}
};
final ReadStreamManagement management = newReadStreamManagement(serverId, groupId, dataApi, request ->
CompletableFuture.completedFuture(RaftClientReply.newBuilder()
.setRequest(request)
.setException(new ReadIndexException("read index failed"))
.build()));
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final ReadOnlyRequest readOnlyRequest = newReadOnlyRequest(clientId, serverId, groupId, 1L, query);

try {
assertTrue(management.process(readOnlyRequest.request, embeddedChannel.pipeline().firstContext()));
assertEquals(0, readOnlyRequest.headerBuf.refCnt());

final List<DataStreamReply> replies = new ArrayList<>();
JavaUtils.attempt(() -> {
for (Object outbound; (outbound = embeddedChannel.readOutbound()) != null;) {
replies.add((DataStreamReply) outbound);
}
assertEquals(1, replies.size());
}, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "read-only check failure reply", null);

assertFalse(queryCalled.get(), "state machine query should not run when the read-only check fails");
final DataStreamReply reply = replies.get(0);
assertEquals(Type.STREAM_HEADER, reply.getType());
assertFalse(reply.isSuccess());
final RaftClientReply clientReply = ClientProtoUtils.getRaftClientReply(reply);
assertFalse(clientReply.isSuccess());
assertNotNull(clientReply.getReadIndexException());
assertEquals(serverId, clientReply.getServerId());
} finally {
embeddedChannel.finishAndReleaseAll();
management.shutdown();
}
}

Expand Down Expand Up @@ -231,13 +331,19 @@ private static class ReadOnlyRequest {

private static ReadStreamManagement newReadStreamManagement(
RaftPeerId serverId, RaftGroupId groupId, DataApi dataApi) {
return newReadStreamManagement(serverId, groupId, dataApi, TestDataStreamManagement::successReply);
}

private static ReadStreamManagement newReadStreamManagement(RaftPeerId serverId, RaftGroupId groupId,
DataApi dataApi, Function<RaftClientRequest, CompletableFuture<RaftClientReply>> submitClientRequestAsync) {
final StateMachine stateMachine = new BaseStateMachine() {
@Override
public DataApi data() {
return dataApi;
}
};
final RaftServer server = newRaftServer(serverId, new RaftProperties(), groupId, newDivision(stateMachine));
final RaftServer server = newRaftServer(serverId, new RaftProperties(), groupId, newDivision(stateMachine),
submitClientRequestAsync);
return new ReadStreamManagement(server);
}

Expand Down Expand Up @@ -275,6 +381,16 @@ private static RaftServer newRaftServer(RaftPeerId serverId, RaftProperties prop

private static RaftServer newRaftServer(RaftPeerId serverId, RaftProperties properties,
RaftGroupId groupId, RaftServer.Division division) {
return newRaftServer(serverId, properties, groupId, division, TestDataStreamManagement::successReply);
}

private static CompletableFuture<RaftClientReply> successReply(RaftClientRequest request) {
return CompletableFuture.completedFuture(RaftClientReply.newBuilder().setRequest(request).setSuccess().build());
}

private static RaftServer newRaftServer(RaftPeerId serverId, RaftProperties properties,
RaftGroupId groupId, RaftServer.Division division,
Function<RaftClientRequest, CompletableFuture<RaftClientReply>> submitClientRequestAsync) {
return (RaftServer) Proxy.newProxyInstance(RaftServer.class.getClassLoader(), new Class<?>[]{RaftServer.class},
(proxy, method, args) -> {
switch (method.getName()) {
Expand All @@ -287,6 +403,8 @@ private static RaftServer newRaftServer(RaftPeerId serverId, RaftProperties prop
return division;
}
throw new IOException("Division not found: " + args[0]);
case "submitClientRequestAsync":
return submitClientRequestAsync.apply((RaftClientRequest) args[0]);
case "close":
return null;
case "toString":
Expand Down
Loading