Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ if(BUILD_TESTING)
-DSCU_LOG_LEVEL=5
)
target_include_directories(network_framework_test PRIVATE SYSTEM ${GMOCK_INCLUDE_DIRS})
target_link_libraries(network_framework_test ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES})
target_link_libraries(network_framework_test -fsanitize=address ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES})
target_compile_options(network_framework_test PRIVATE -fsanitize=address -fno-omit-frame-pointer)

target_link_libraries(test_scorpio_udp_framework
buffered_file_saver
Expand Down
25 changes: 24 additions & 1 deletion SCORPIO_UDP_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Last packet:

### Subcommands

Each command packet carries a 1-byte subcommand as the first payload byte.
Each command packet carries a 1-byte subcommand as the first payload byte. `CONNECT` and `DISCONNECT` packets additionally carry an 8-byte [connection id](#connection-identity) immediately after the subcommand byte (every subcommand of both commands).

**PING subcommands:**

Expand Down Expand Up @@ -154,6 +154,17 @@ Each command packet carries a 1-byte subcommand as the first payload byte.
| 2 | REJECTED | responder -> initiator |
| 3 | ALREADY_DISCONNECTED | responder -> initiator |

Both CONNECT and DISCONNECT packets lay out their payload as the subcommand byte followed by the 8-byte connection id:

```
+------+---------------+-----------------------------+
| cmd | subcommand | connection id |
| 1 B | 1 B | 8 B (uint64, big-endian) |
+------+---------------+-----------------------------+
```

The responder always echoes back the connection id it received, unchanged. See [Connection identity](#connection-identity).

**CREATE_STREAM subcommands:**

| Value | Name | Direction |
Expand Down Expand Up @@ -232,6 +243,18 @@ sequenceDiagram
end
```

### Connection identity

Every connection carries a **connection id**: a `uint64_t` (`ConnectionId`) chosen at random by the initiator when `connect()` is called. It is sent in every CONNECT and DISCONNECT packet (all subcommands) and the acceptor stores and echoes it back unchanged.

The connection id is **not** a demultiplexing key - a connection is still identified solely by its remote `ip:port`, and there is at most one connection per peer address. The id exists only as a guard against a peer that dies and restarts so fast that the other side never noticed the old connection went stale.

How the id is used:

- **CONNECT for an existing connection.** If the stored id matches, the acceptor replies `ALREADY_CONNECTED`. If it differs (a restarted peer reusing the same address with a fresh id), the acceptor **soft-panics its own stale connection and sends nothing back**. The old connection drops to `ERROR` (no longer alive) and is evicted on the next lookup; the initiator keeps retrying CONNECT every heartbeat and gets accepted once the stale entry is gone.
- **ACCEPTED / REJECTED / ALREADY_CONNECTED responses.** Ignored if the id does not match the local connection (`ACCEPTED` with a mismatched id also draws an `ERROR` reply). This prevents a late reply meant for a previous incarnation from advancing the current connection's state.
- **DISCONNECT.** Accepted (replied `ACCEPTED`, connection closed) only when the id matches a live connection for that address; otherwise the responder replies `ALREADY_DISCONNECTED`. A late DISCONNECT response addressed to a previous incarnation is therefore ignored and cannot tear down a freshly re-established connection.

## Stream Lifecycle

### State machine
Expand Down
20 changes: 18 additions & 2 deletions include/scorpio_utils/network/scorpio_udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <random>
#include <set>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -57,6 +58,7 @@ using SeqNumber = uint32_t;
using SeqNumberComplement = uint32_t;
using StreamNumber = uint16_t;
using FramesLeft = uint16_t;
using ConnectionId = uint64_t;

#ifdef SCU_UDP_MOCK
using TimeProvider = scorpio_utils::testing::MockTimeProvider;
Expand Down Expand Up @@ -96,7 +98,7 @@ struct Code {
PONG,
};

enum class ConnectionSubCommands : uint8_t {
enum class ConnectSubCommands : uint8_t {
CONNECT,
ACCEPTED,
REJECTED,
Expand Down Expand Up @@ -337,6 +339,7 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
friend class ScorpioUdpStream;
const Ipv4 _remote_ip;
const Port _remote_port;
const ConnectionId _connection_id;
std::atomic<size_t> _sequence_number;
std::atomic<bool> _panic;
std::atomic<State> _state;
Expand Down Expand Up @@ -371,8 +374,11 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
void send_heartbeat();
void processing_thread();

ScorpioUdpConnection(Ipv4 remote_ip, Port remote_port, std::shared_ptr<ScorpioUdp> parent);
ScorpioUdpConnection(
Ipv4 remote_ip, Port remote_port, ConnectionId connection_id,
std::shared_ptr<ScorpioUdp> parent);
void panic(std::string&& message);
void panic_soft(std::string&& message);
auto generate_packets(
Code code, const std::vector<uint8_t>& data, std::optional<StreamNumber> stream_number = std::nullopt,
std::optional<std::reference_wrapper<std::atomic<size_t>>> sequence_number = std::nullopt);
Expand Down Expand Up @@ -453,6 +459,9 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
SCU_ALWAYS_INLINE auto get_socket() const noexcept {
return _parent;
}
SCU_ALWAYS_INLINE auto connection_id() const noexcept {
return _connection_id;
}
};

class ScorpioUdp : public std::enable_shared_from_this<ScorpioUdp> {
Expand All @@ -463,6 +472,8 @@ class ScorpioUdp : public std::enable_shared_from_this<ScorpioUdp> {
#else
scorpio_utils::network::UdpSocket& _socket;
#endif
std::mutex _random_engine_mutex;
std::mt19937_64 _random_engine;
std::atomic<size_t> _mock_sequence_number;
std::atomic<bool> _auto_accept;
std::atomic<bool> _stop;
Expand Down Expand Up @@ -522,6 +533,11 @@ class ScorpioUdp : public std::enable_shared_from_this<ScorpioUdp> {
void pull_awaiting_connections(std::weak_ptr<ScorpioUdpConnection> connection_weak);
std::shared_ptr<ScorpioUdpConnection> get_connection(Ipv4 remote_ip, Port remote_port);

[[nodiscard]] SCU_ALWAYS_INLINE auto get_random_number() {
std::lock_guard lock(_random_engine_mutex);
return _random_engine();
}

public:
SCU_ALWAYS_INLINE bool SCU_EAGER_SELECT_IS_READY() noexcept {
return _new_connections && _new_connections->SCU_EAGER_SELECT_IS_READY();
Expand Down
Loading
Loading