From 1d7725749ff6eca7717838ef366be3d89e4bdfef Mon Sep 17 00:00:00 2001 From: igorosky Date: Thu, 18 Jun 2026 01:03:43 +0200 Subject: [PATCH 1/4] Introduction of connection id Signed-off-by: igorosky --- include/scorpio_utils/network/scorpio_udp.hpp | 20 ++- src/network/scorpio_udp.cpp | 164 +++++++++++++----- test/network/test_scorpio_udp.cpp | 8 +- test/network/test_scorpio_udp_framework.cpp | 28 +-- test/network/test_scorpio_udp_gmock.cpp | 4 +- 5 files changed, 163 insertions(+), 61 deletions(-) diff --git a/include/scorpio_utils/network/scorpio_udp.hpp b/include/scorpio_utils/network/scorpio_udp.hpp index a0716b1..6eea57f 100644 --- a/include/scorpio_utils/network/scorpio_udp.hpp +++ b/include/scorpio_utils/network/scorpio_udp.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -57,6 +58,7 @@ using SeqNumber = uint32_t; using SeqNumberComplement = uint32_t; using StreamNumber = uint16_t; using FramesLeft = uint16_t; +using ConnectionId = uint64_t; #ifdef SCU_UDP_MOCK using TimeProvider = scorpio_utils::testing::MockTimeProvider; @@ -96,7 +98,7 @@ struct Code { PONG, }; - enum class ConnectionSubCommands : uint8_t { + enum class ConnectSubCommands : uint8_t { CONNECT, ACCEPTED, REJECTED, @@ -337,6 +339,7 @@ class ScorpioUdpConnection : public std::enable_shared_from_this _sequence_number; std::atomic _panic; std::atomic _state; @@ -371,8 +374,11 @@ class ScorpioUdpConnection : public std::enable_shared_from_this parent); + ScorpioUdpConnection( + Ipv4 remote_ip, Port remote_port, ConnectionId connection_id, + std::shared_ptr parent); void panic(std::string&& message); + void panic_soft(std::string&& message); auto generate_packets( Code code, const std::vector& data, std::optional stream_number = std::nullopt, std::optional>> sequence_number = std::nullopt); @@ -453,6 +459,9 @@ class ScorpioUdpConnection : public std::enable_shared_from_this { @@ -463,6 +472,8 @@ class ScorpioUdp : public std::enable_shared_from_this { #else scorpio_utils::network::UdpSocket& _socket; #endif + std::mutex _random_engine_mutex; + std::mt19937_64 _random_engine; std::atomic _mock_sequence_number; std::atomic _auto_accept; std::atomic _stop; @@ -522,6 +533,11 @@ class ScorpioUdp : public std::enable_shared_from_this { void pull_awaiting_connections(std::weak_ptr connection_weak); std::shared_ptr get_connection(Ipv4 remote_ip, Port remote_port); + [[nodiscard]] SCU_ALWAYS_INLINE auto get_random_number() { + std::lock_guard lock(_random_engine_mutex); + return _random_engine(); + } + public: SCU_ALWAYS_INLINE bool SCU_EAGER_SELECT_IS_READY() noexcept { return _new_connections && _new_connections->SCU_EAGER_SELECT_IS_READY(); diff --git a/src/network/scorpio_udp.cpp b/src/network/scorpio_udp.cpp index a87d147..34d69b1 100644 --- a/src/network/scorpio_udp.cpp +++ b/src/network/scorpio_udp.cpp @@ -37,6 +37,9 @@ using scorpio_utils::network::StreamNumber; using scorpio_utils::network::UdpData; using scorpio_utils::network::TimeProvider; +static_assert(sizeof(Code::ConnectSubCommands) == 1, "ConnectSubCommands size is assumed to be 1 byte"); +static_assert(sizeof(Code::DisconnectSubCommands) == 1, "DisconnectSubCommands size is assumed to be 1 byte"); + #define AS_BYTE(x) (SCU_AS(uint8_t, x)) struct PanicException : public std::exception { }; @@ -163,6 +166,7 @@ ScorpioUdp::ScorpioUdp( #ifdef SCU_UDP_MOCK _socket(socket), #endif + _random_engine(SCU_AS(uint64_t, std::random_device{ }()) << 32 | SCU_AS(uint64_t, std::random_device{ }())), _auto_accept(false), _stop(true), _logger(logger), @@ -389,43 +393,63 @@ void ScorpioUdp::handle_ping_packet(const MessageHeader& header, const UdpData& } void ScorpioUdp::handle_connect_packet(const MessageHeader& header, const UdpData& udp_data) { - if (udp_data.data.size() - header.data_offset != 1) { + if (udp_data.data.size() - header.data_offset != sizeof(Code::ConnectSubCommands) + sizeof(ConnectionId)) { // TODO(@Igor): Handle error properly SCU_LOG_ERROR(_logger, - "Invalid CONNECT packet size: expected 1 byte for subcommand, got {}", - udp_data.data.size() - header.data_offset); + "Invalid CONNECT packet size: expected {} byte for subcommand, got {}", + sizeof(Code::ConnectSubCommands) + sizeof(ConnectionId), udp_data.data.size() - header.data_offset); return; } SCU_LOG_TRACE(_logger, "Handling CONNECT packet from {}:{}. Subcommand: {}", - udp_data.ip.str(), udp_data.port, SCU_AS(Code::ConnectionSubCommands, udp_data.data[header.data_offset])); - switch (SCU_AS(Code::ConnectionSubCommands, udp_data.data[header.data_offset])) { - case Code::ConnectionSubCommands::CONNECT: { - if (get_connection(udp_data.ip, udp_data.port)) { - send_or_panic(std::nullopt, _mock_sequence_number, Code::CONNECT, - udp_data.ip, udp_data.port, - { AS_BYTE(Code::ConnectionSubCommands::ALREADY_CONNECTED) }, - "Failed to send ALREADY_CONNECTED response"); + udp_data.ip.str(), udp_data.port, SCU_AS(Code::ConnectSubCommands, udp_data.data[header.data_offset])); + size_t offset = header.data_offset + sizeof(Code::ConnectSubCommands); + ConnectionId connection_id; + SCU_DO_AND_ASSERT(scorpio_utils::network::network_to_host(udp_data.data, &connection_id, offset), + "Failed to parse connection_id from CONNECT packet"); + switch (SCU_AS(Code::ConnectSubCommands, udp_data.data[header.data_offset])) { + case Code::ConnectSubCommands::CONNECT: { + const auto connection = get_connection(udp_data.ip, udp_data.port); + std::vector connect_data; + connect_data.resize(sizeof(Code::ConnectSubCommands) + sizeof(ConnectionId)); + offset = sizeof(Code::ConnectSubCommands); + SCU_DO_AND_ASSERT(host_to_network(connection_id, connect_data, offset), + "Failed to convert connection ID to network format for CONNECT packet"); + if (connection) { + if (connection->connection_id() != connection_id) { + // TODO(@Igor): Handle error properly + SCU_LOG_ERROR(_logger, + "Received CONNECT for existing connection with different connection_id. ip: {}, port: {}, " + "existing connection_id: {}, received connection_id: {}", + udp_data.ip.str(), udp_data.port, connection->connection_id(), connection_id); + connection->panic_soft("Received CONNECT for connection with same ip and port but different connection_id"); + // Lets send nothing here it is not the cleanest solution but this way we will not create a new connection + // and will not reject it either. The client will try again in some time + // and meanwhile some old packets will be send/dropped + } else { + connect_data[0] = AS_BYTE(Code::ConnectSubCommands::ALREADY_CONNECTED); + send_or_panic(std::nullopt, _mock_sequence_number, Code::CONNECT, + udp_data.ip, udp_data.port, + connect_data, "Failed to send ALREADY_CONNECTED response"); + } } else if (_auto_accept.load(std::memory_order_relaxed)) { std::shared_ptr new_connection(new ScorpioUdpConnection( - udp_data.ip, udp_data.port, shared_from_this())); + udp_data.ip, udp_data.port, connection_id, shared_from_this())); new_connection->_start_signal.notify(100000); new_connection->_state.store(ScorpioUdpConnection::State::CONNECTING); _connections.insert({ { udp_data.ip, udp_data.port }, new_connection }); SCU_ASSERT(_new_connections, "Channel must exist if auto accept is enabled"); _new_connections->send(new_connection); + connect_data[0] = AS_BYTE(Code::ConnectSubCommands::ACCEPTED); send_or_panic(std::nullopt, _mock_sequence_number, Code::CONNECT, - udp_data.ip, udp_data.port, - { AS_BYTE(Code::ConnectionSubCommands::ACCEPTED) }, - "Failed to send ACCEPTED response"); + udp_data.ip, udp_data.port, connect_data, "Failed to send ACCEPTED response"); new_connection->connected(); } else { + connect_data[0] = AS_BYTE(Code::ConnectSubCommands::REJECTED); send_or_panic(std::nullopt, _mock_sequence_number, Code::CONNECT, - udp_data.ip, udp_data.port, - { AS_BYTE(Code::ConnectionSubCommands::REJECTED) }, - "Failed to send REJECTED response"); + udp_data.ip, udp_data.port, connect_data, "Failed to send REJECTED response"); } } break; - case Code::ConnectionSubCommands::ACCEPTED: { + case Code::ConnectSubCommands::ACCEPTED: { auto connection = get_connection(udp_data.ip, udp_data.port); if (!connection) { SCU_LOG_ERROR(_logger, "Received ACCEPTED for non-existing connection ip: {}, port: {}", @@ -433,29 +457,45 @@ void ScorpioUdp::handle_connect_packet(const MessageHeader& header, const UdpDat // TODO(@Igor): Handle error properly send_or_panic(std::nullopt, _mock_sequence_number, Code::ERROR, udp_data.ip, udp_data.port, { }, "Failed to send ERROR response for ACCEPTED subcommand for non-existing connection"); + } else if (connection->connection_id() != connection_id) { + SCU_LOG_ERROR(_logger, "Received ACCEPTED for connection with different connection_id. ip: {}, port: {}, " + "existing connection_id: {}, received connection_id: {}", + udp_data.ip.str(), udp_data.port, connection->connection_id(), connection_id); + // TODO(@Igor): Handle error properly + send_or_panic(std::nullopt, _mock_sequence_number, Code::ERROR, udp_data.ip, udp_data.port, + { }, "Failed to send ERROR response for ACCEPTED subcommand for connection with different connection_id"); } else if (!connection->connected()) { SCU_LOG_ERROR(_logger, "Received ACCEPTED for connection not in CONNECTING state. ip: {}, port: {}", udp_data.ip.str(), udp_data.port); // TODO(@Igor): Handle error properly } } break; - case Code::ConnectionSubCommands::REJECTED: { + case Code::ConnectSubCommands::REJECTED: { auto connection = get_connection(udp_data.ip, udp_data.port); if (!connection) { // TODO(@Igor): Handle error properly SCU_LOG_ERROR(_logger, "Received REJECTED for non-existing connection ip: {}, port: {}", udp_data.ip.str(), udp_data.port); + } else if (connection->connection_id() != connection_id) { + SCU_LOG_ERROR(_logger, "Received REJECTED for connection with different connection_id. ip: {}, port: {}, " + "existing connection_id: {}, received connection_id: {}", + udp_data.ip.str(), udp_data.port, connection->connection_id(), connection_id); + // TODO(@Igor): Handle error properly } else { SCU_LOG_INFO(_logger, "Connection rejected by {}:{}", udp_data.ip.str(), udp_data.port); connection->_state = ScorpioUdpConnection::State::REJECTED; } } break; - case Code::ConnectionSubCommands::ALREADY_CONNECTED: { + case Code::ConnectSubCommands::ALREADY_CONNECTED: { auto connection = get_connection(udp_data.ip, udp_data.port); ScorpioUdpConnection::State expected = ScorpioUdpConnection::State::CONNECTING; if (!connection) { SCU_LOG_ERROR(_logger, "Received ALREADY_CONNECTED for non-existing connection ip: {}, port: {}", udp_data.ip.str(), udp_data.port); + } else if (connection->connection_id() != connection_id) { + SCU_LOG_ERROR(_logger, "Received ALREADY_CONNECTED for connection with different connection_id. ip: {}, port: {}, " + "existing connection_id: {}, received connection_id: {}", + udp_data.ip.str(), udp_data.port, connection->connection_id(), connection_id); } else if (connection->_state.compare_exchange_strong(expected, ScorpioUdpConnection::State::CONNECTED, std::memory_order_relaxed, std::memory_order_relaxed)) { SCU_LOG_INFO(_logger, "Connection established (ALREADY_CONNECTED) with {}:{}", @@ -474,29 +514,38 @@ void ScorpioUdp::handle_connect_packet(const MessageHeader& header, const UdpDat } void ScorpioUdp::handle_disconnect_packet(const MessageHeader& header, const UdpData& udp_data) { - if (udp_data.data.size() - header.data_offset != 1) { + if (udp_data.data.size() - header.data_offset != sizeof(Code::DisconnectSubCommands) + sizeof(ConnectionId)) { // TODO(@Igor): Handle error properly SCU_LOG_ERROR(_logger, - "Invalid DISCONNECT packet size: expected 1 byte for subcommand, got {}", - udp_data.data.size() - header.data_offset); + "Invalid DISCONNECT packet size: expected {} byte for subcommand, got {}", + sizeof(Code::DisconnectSubCommands) + sizeof(ConnectionId), udp_data.data.size() - header.data_offset); return; } SCU_LOG_TRACE(_logger, "Handling DISCONNECT packet from {}:{}. Subcommand: {}", udp_data.ip.str(), udp_data.port, SCU_AS(Code::DisconnectSubCommands, udp_data.data[header.data_offset])); + size_t offset = header.data_offset + sizeof(Code::DisconnectSubCommands); + ConnectionId connection_id; + SCU_DO_AND_ASSERT(scorpio_utils::network::network_to_host(udp_data.data, &connection_id, offset), + "Failed to parse connection_id from DISCONNECT packet"); switch (SCU_AS(Code::DisconnectSubCommands, udp_data.data[header.data_offset])) { case Code::DisconnectSubCommands::DISCONNECT: { + std::vector data; + data.resize(sizeof(Code::DisconnectSubCommands) + sizeof(ConnectionId)); + offset = sizeof(Code::DisconnectSubCommands); + SCU_DO_AND_ASSERT(scorpio_utils::network::host_to_network(connection_id, data, offset), + "Failed to serialize connection_id for DISCONNECT packet"); if (auto connection_opt = get_connection(udp_data.ip, udp_data.port); - connection_opt != nullptr && connection_opt->is_alive()) { + connection_opt != nullptr && connection_opt->is_alive() && connection_opt->connection_id() == connection_id) { + data[0] = AS_BYTE(Code::DisconnectSubCommands::ACCEPTED); send_or_panic(std::nullopt, _mock_sequence_number, Code::DISCONNECT, - udp_data.ip, udp_data.port, - { AS_BYTE(Code::DisconnectSubCommands::ACCEPTED) }, + udp_data.ip, udp_data.port, data, "Failed to send DISCONNECT ACCEPTED response"); connection_opt->close(false); } else { // Handle disconnect for non-existing connection + data[0] = AS_BYTE(Code::DisconnectSubCommands::ALREADY_DISCONNECTED); send_or_panic(std::nullopt, _mock_sequence_number, Code::DISCONNECT, - udp_data.ip, udp_data.port, - { AS_BYTE(Code::DisconnectSubCommands::ALREADY_DISCONNECTED) }, + udp_data.ip, udp_data.port, data, "Failed to send DISCONNECT ACCEPTED response for non-existing connection"); } } break; @@ -546,12 +595,17 @@ SCU_HOT void ScorpioUdp::process_packet(UdpData udp_data) { void ScorpioUdp::pull_awaiting_connections(std::weak_ptr connection_weak) { if (auto connection = connection_weak.lock()) { if (get_connection(connection->remote_ip(), connection->remote_port())) { - connection->panic("Connection already exists"); + connection->panic_soft("Connection already exists"); } else { SCU_ASSERT(connection->state() == ScorpioUdpConnection::State::NEW, "Connection in invalid state"); + std::vector data(sizeof(Code::ConnectSubCommands) + sizeof(ConnectionId)); + data[0] = AS_BYTE(Code::ConnectSubCommands::CONNECT); + size_t offset = sizeof(Code::ConnectSubCommands); + SCU_DO_AND_ASSERT(scorpio_utils::network::host_to_network(connection->connection_id(), data, offset), + "Failed to serialize connection_id for CONNECT packet"); send_or_panic(std::nullopt, _mock_sequence_number, Code::CONNECT, connection->remote_ip(), - connection->remote_port(), { AS_BYTE(Code::ConnectionSubCommands::CONNECT) }); + connection->remote_port(), data, "Failed to send CONNECT packet"); connection->_state.store(ScorpioUdpConnection::State::CONNECTING, std::memory_order_relaxed); auto address_pair = std::make_pair(connection->remote_ip(), connection->remote_port()); _connections.insert({ address_pair, std::move(connection) }); @@ -700,7 +754,8 @@ void ScorpioUdp::send_or_panic( std::shared_ptr ScorpioUdp::connect( Ipv4 ip, Port port) { - std::shared_ptr connection(new ScorpioUdpConnection(ip, port, shared_from_this())); + std::shared_ptr connection(new ScorpioUdpConnection(ip, port, get_random_number(), + shared_from_this())); connection->_start_signal.notify(100000); _awaiting_connections_channel.send(connection); return connection; @@ -708,9 +763,12 @@ std::shared_ptr ScorpioUdp::connect( // ========================= ScorpioUdpConnection implementation ======================= -ScorpioUdpConnection::ScorpioUdpConnection(Ipv4 remote_ip, Port remote_port, std::shared_ptr parent) +ScorpioUdpConnection::ScorpioUdpConnection( + Ipv4 remote_ip, Port remote_port, ConnectionId connection_id, + std::shared_ptr parent) : _remote_ip(remote_ip), _remote_port(remote_port), + _connection_id{connection_id}, _sequence_number(0), _panic(false), _state(State::NEW), @@ -1098,11 +1156,19 @@ void ScorpioUdpConnection::processing_thread() { std::this_thread::sleep_for(std::chrono::nanoseconds(SCU_UDP_HEARTBEAT_PERIOD / 4)); } std::this_thread::sleep_for(std::chrono::nanoseconds(SCU_UDP_HEARTBEAT_PERIOD)); - while (SCU_LIKELY((self = self_weak.lock()) && !_stop.load(std::memory_order_relaxed)) && - _state.load(std::memory_order_relaxed) == State::CONNECTING) { - SCU_DEFER([&self] { self.reset(); }); - send_or_panic(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) }); - std::this_thread::sleep_for(std::chrono::nanoseconds(SCU_UDP_HEARTBEAT_PERIOD)); + { + std::vector connect_data; + connect_data.resize(sizeof(Code::ConnectSubCommands) + sizeof(ConnectionId)); + connect_data[0] = AS_BYTE(Code::ConnectSubCommands::CONNECT); + size_t offset = 1; + SCU_DO_AND_ASSERT(host_to_network(_connection_id, connect_data, offset), + "Failed to convert connection ID to network format"); + while (SCU_LIKELY((self = self_weak.lock()) && !_stop.load(std::memory_order_relaxed)) && + _state.load(std::memory_order_relaxed) == State::CONNECTING) { + SCU_DEFER([&self] { self.reset(); }); + send_or_panic(Code::CONNECT, connect_data); + std::this_thread::sleep_for(std::chrono::nanoseconds(SCU_UDP_HEARTBEAT_PERIOD)); + } } if (SCU_UNLIKELY((self = self_weak.lock()) == nullptr || _stop.load(std::memory_order_relaxed))) { return; @@ -1133,6 +1199,19 @@ void ScorpioUdpConnection::processing_thread() { } } +SCU_COLD void ScorpioUdpConnection::panic_soft(std::string&& message) { + std::unique_lock lock(_panic_mutex, std::try_to_lock); + if (!lock.owns_lock()) { + // No need for waiting since panic_soft shall be called from the socket thread + return; + } + SCU_LOG_FATAL(_logger, "Connection panic: {}", message); + _panic_message = std::move(message); + _panic.store(true, std::memory_order_release); + _state.store(State::ERROR, std::memory_order_relaxed); + _stop.store(true, std::memory_order_relaxed); +} + SCU_COLD SCU_NORETURN void ScorpioUdpConnection::panic(std::string&& message) { std::unique_lock lock(_panic_mutex, std::try_to_lock); if (!lock.owns_lock()) { @@ -1199,8 +1278,13 @@ bool ScorpioUdpConnection::close(bool send_disconnect) { } SCU_LOG_INFO(_logger, "Closing connection {}:{}", _remote_ip.str(), _remote_port); if (send_disconnect) { - send(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::DISCONNECT) }, - std::nullopt, std::nullopt); + std::vector disconnect_data; + disconnect_data.resize(sizeof(Code::DisconnectSubCommands) + sizeof(ConnectionId)); + disconnect_data[0] = AS_BYTE(Code::DisconnectSubCommands::DISCONNECT); + size_t offset = 1; + SCU_DO_AND_ASSERT(host_to_network(_connection_id, disconnect_data, offset), + "Failed to convert connection ID to network format for DISCONNECT packet"); + send(Code::DISCONNECT, disconnect_data, std::nullopt, std::nullopt); } _stop.store(true, std::memory_order_relaxed); _state.store(State::CLOSED, std::memory_order_relaxed); diff --git a/test/network/test_scorpio_udp.cpp b/test/network/test_scorpio_udp.cpp index 2c6064d..dabb4e2 100644 --- a/test/network/test_scorpio_udp.cpp +++ b/test/network/test_scorpio_udp.cpp @@ -61,10 +61,12 @@ auto get_client_server_connection(scorpio_utils::network::Port port) { } TEST(ScorpioUdp, BasicSend) { + auto sender_logger = std::make_shared>("sender_log_basic.txt"); + auto receiver_logger = std::make_shared>("receiver_log_basic.txt"); const auto port = PORT; std::atomic server_ready(false); - std::thread server([&server_ready, port]() { - auto socket = ScorpioUdp::create(); + std::thread server([&server_ready, port, receiver_logger]() { + auto socket = ScorpioUdp::create(std::move(receiver_logger)); ASSERT_TRUE(socket->start()); std::ignore = socket->set_auto_accept(true); ASSERT_TRUE(socket->listen(localhost, port)); @@ -84,7 +86,7 @@ TEST(ScorpioUdp, BasicSend) { EXPECT_EQ(data.value(), (std::vector{ 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd', '!' })); std::this_thread::sleep_for(std::chrono::seconds(5)); }); - const auto socket = ScorpioUdp::create(); + const auto socket = ScorpioUdp::create(sender_logger); ASSERT_TRUE(socket->start()); while (!server_ready.load(std::memory_order_relaxed)) { std::this_thread::yield(); diff --git a/test/network/test_scorpio_udp_framework.cpp b/test/network/test_scorpio_udp_framework.cpp index 9c1ce7c..3f0a4b7 100644 --- a/test/network/test_scorpio_udp_framework.cpp +++ b/test/network/test_scorpio_udp_framework.cpp @@ -336,9 +336,9 @@ static bool is_background_packet( return false; } if (command == Code::CONNECT && - packet[1] == static_cast(Code::ConnectionSubCommands::CONNECT) && + packet[1] == static_cast(Code::ConnectSubCommands::CONNECT) && !(expected_command == Code::CONNECT && expected.size() >= 2 && - expected[1] == static_cast(Code::ConnectionSubCommands::CONNECT))) { + expected[1] == static_cast(Code::ConnectSubCommands::CONNECT))) { return true; } if (command == Code::CREATE_STREAM && @@ -1250,9 +1250,9 @@ auto create_connection(std::vector& events) { events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::ACCEPTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); return connection_handle; } @@ -1290,9 +1290,9 @@ TEST_F(ScorpioUdpTester, reconnect_same_address_before_disconnect_accept) { auto conn2 = ConnectionHandle::create(); events.push_back({ WHERE, 0, conn2->create_connection(peer, port) }); events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::ACCEPTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); events.push_back({ WHERE, 0, conn2->connection_is_alive(true) }); events.push_back({ WHERE, 0, conn1->connection_is_alive(false) }); @@ -1312,9 +1312,9 @@ TEST_F(ScorpioUdpTester, accept_connection_and_close) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 10001) }); events.push_back({ WHERE, 0, std::make_unique(true) }); events.push_back({ WHERE, TICK_TIME, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::ACCEPTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); events.push_back({ WHERE, 0, connection_handle->get_connection(true) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); events.push_back({ WHERE, 0, connection_handle->close_connection(true) }); @@ -1330,9 +1330,9 @@ TEST_F(ScorpioUdpTester, reject_connection) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 10001) }); events.push_back({ WHERE, 0, std::make_unique(false) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::REJECTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::REJECTED) })) }); execute_test(events); } @@ -1359,9 +1359,9 @@ TEST_F(ScorpioUdpTester, connect_rejected_by_peer) { events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::REJECTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::REJECTED) })) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(false) }); // Explicit local close synchronously joins the connection's processing thread // before the test ends, avoiding a race in scorpio_udp.cpp:1104 where the @@ -1959,9 +1959,9 @@ TEST_F(ScorpioUdpTester, connect_handles_already_connected_response) { events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectionSubCommands::ALREADY_CONNECTED) })) }); + generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ALREADY_CONNECTED) })) }); // ExpectPacketTimeout silently drains the CONNECT retransmits via the // is_background_packet filter, so this only succeeds if a HEARTBEAT is // actually emitted (i.e., we reached State::CONNECTED). diff --git a/test/network/test_scorpio_udp_gmock.cpp b/test/network/test_scorpio_udp_gmock.cpp index b199f47..0a50d85 100644 --- a/test/network/test_scorpio_udp_gmock.cpp +++ b/test/network/test_scorpio_udp_gmock.cpp @@ -99,7 +99,7 @@ TEST(ScorpioUdpGmock, Listen) { // generate_packets( // std::nullopt, // seq_num, Code::CONNECT, -// { SCU_AS(uint8_t, Code::ConnectionSubCommands::CONNECT) } +// { SCU_AS(uint8_t, Code::ConnectSubCommands::CONNECT) } // ); // ASSERT_TRUE(connect_packet_opt.has_value()); // ASSERT_EQ(connect_packet_opt->second.size(), 1); @@ -110,7 +110,7 @@ TEST(ScorpioUdpGmock, Listen) { // generate_packets( // std::nullopt, // seq_num, Code::CONNECT, -// { SCU_AS(uint8_t, Code::ConnectionSubCommands::ACCEPTED) } +// { SCU_AS(uint8_t, Code::ConnectSubCommands::ACCEPTED) } // ); // ASSERT_TRUE(connect_accept_opt.has_value()); // ASSERT_EQ(connect_accept_opt->second.size(), 1); From 3811756809a209f33ad437ae743b39fd5af601b8 Mon Sep 17 00:00:00 2001 From: igorosky Date: Thu, 18 Jun 2026 09:02:27 +0200 Subject: [PATCH 2/4] Docs & tests update Signed-off-by: igorosky --- SCORPIO_UDP_PROTOCOL.md | 25 +- test/network/test_scorpio_udp_framework.cpp | 313 +++++++++++++------- 2 files changed, 238 insertions(+), 100 deletions(-) diff --git a/SCORPIO_UDP_PROTOCOL.md b/SCORPIO_UDP_PROTOCOL.md index 989d656..26a6912 100644 --- a/SCORPIO_UDP_PROTOCOL.md +++ b/SCORPIO_UDP_PROTOCOL.md @@ -125,7 +125,7 @@ Last packet: ### Subcommands -Each command packet carries a 1-byte subcommand as the first payload byte. +Each command packet carries a 1-byte subcommand as the first payload byte. `CONNECT` and `DISCONNECT` packets additionally carry an 8-byte [connection id](#connection-identity) immediately after the subcommand byte (every subcommand of both commands). **PING subcommands:** @@ -154,6 +154,17 @@ Each command packet carries a 1-byte subcommand as the first payload byte. | 2 | REJECTED | responder -> initiator | | 3 | ALREADY_DISCONNECTED | responder -> initiator | +Both CONNECT and DISCONNECT packets lay out their payload as the subcommand byte followed by the 8-byte connection id: + +``` ++------+---------------+-----------------------------+ +| cmd | subcommand | connection id | +| 1 B | 1 B | 8 B (uint64, big-endian) | ++------+---------------+-----------------------------+ +``` + +The responder always echoes back the connection id it received, unchanged. See [Connection identity](#connection-identity). + **CREATE_STREAM subcommands:** | Value | Name | Direction | @@ -232,6 +243,18 @@ sequenceDiagram end ``` +### Connection identity + +Every connection carries a **connection id**: a `uint64_t` (`ConnectionId`) chosen at random by the initiator when `connect()` is called. It is sent in every CONNECT and DISCONNECT packet (all subcommands) and the acceptor stores and echoes it back unchanged. + +The connection id is **not** a demultiplexing key - a connection is still identified solely by its remote `ip:port`, and there is at most one connection per peer address. The id exists only as a guard against a peer that dies and restarts so fast that the other side never noticed the old connection went stale. + +How the id is used: + +- **CONNECT for an existing connection.** If the stored id matches, the acceptor replies `ALREADY_CONNECTED`. If it differs (a restarted peer reusing the same address with a fresh id), the acceptor **soft-panics its own stale connection and sends nothing back**. The old connection drops to `ERROR` (no longer alive) and is evicted on the next lookup; the initiator keeps retrying CONNECT every heartbeat and gets accepted once the stale entry is gone. +- **ACCEPTED / REJECTED / ALREADY_CONNECTED responses.** Ignored if the id does not match the local connection (`ACCEPTED` with a mismatched id also draws an `ERROR` reply). This prevents a late reply meant for a previous incarnation from advancing the current connection's state. +- **DISCONNECT.** Accepted (replied `ACCEPTED`, connection closed) only when the id matches a live connection for that address; otherwise the responder replies `ALREADY_DISCONNECTED`. A late DISCONNECT response addressed to a previous incarnation is therefore ignored and cannot tear down a freshly re-established connection. + ## Stream Lifecycle ### State machine diff --git a/test/network/test_scorpio_udp_framework.cpp b/test/network/test_scorpio_udp_framework.cpp index 3f0a4b7..7651907 100644 --- a/test/network/test_scorpio_udp_framework.cpp +++ b/test/network/test_scorpio_udp_framework.cpp @@ -24,6 +24,7 @@ using scorpio_utils::Expected; using scorpio_utils::network::Code; +using scorpio_utils::network::ConnectionId; using scorpio_utils::network::Ipv4; using scorpio_utils::network::Port; using scorpio_utils::network::SeqNumber; @@ -96,6 +97,27 @@ SCU_ALWAYS_INLINE void write_be32(std::vector& dst, uint32_t v) { dst.push_back(static_cast(v & 0xff)); } +SCU_ALWAYS_INLINE void write_be64(std::vector& dst, uint64_t v) { + for (int shift = 56; shift >= 0; shift -= 8) { + dst.push_back(static_cast((v >> shift) & 0xffu)); + } +} + +// A CONNECT/DISCONNECT payload: 1-byte subcommand followed by the 8-byte +// big-endian connection id, matching the wire format the implementation expects. +std::vector id_payload(uint8_t subcommand, ConnectionId connection_id) { + std::vector payload; + payload.reserve(1 + sizeof(ConnectionId)); + payload.push_back(subcommand); + write_be64(payload, connection_id); + return payload; +} + +// Connection id used by tests that originate a CONNECT/DISCONNECT themselves +// (i.e. where the test, not ScorpioUdp, picks the id). For connections that +// ScorpioUdp initiates the id is random and must be read from the connection. +constexpr ConnectionId TEST_PEER_CONNECTION_ID = 0x1122334455667788ULL; + std::vector generate_heartbeat_body( StreamNumber stream_id, SeqNumber initial_end, const std::vector>& held_ranges = { }) { @@ -350,6 +372,39 @@ static bool is_background_packet( return false; } +// Drains the send queue, skipping background packets, until a packet matching +// `expected` (exact ip/port/bytes) is seen or `max_attempts` time ticks elapse. +// Shared by ExpectPacketTimeout and the connection-id-aware lazy events below. +static Expected drain_until_match( + UdpSocket& socket, Ipv4 remote_ip, Port remote_port, + const std::vector& expected, int64_t period, size_t max_attempts) { + const auto time_provider = get_time_provider(); + for (size_t attempt = 0; attempt < max_attempts; ++attempt) { + while (auto result = socket.get_from_send_queue()) { + auto [ip, port, data] = *std::move(result); + if (is_background_packet(data, expected)) { + continue; + } + if (SCU_UNLIKELY(ip != remote_ip)) { + return Unexpected("Expected remote IP "s + remote_ip.str() + " but got " + + std::to_string(ip.ip())); + } + if (SCU_UNLIKELY(port != remote_port)) { + return Unexpected("Expected remote port " + std::to_string(remote_port) + + " but got " + std::to_string(port)); + } + if (SCU_UNLIKELY(data != expected)) { + return Unexpected("Expected data " + packet_to_string(expected) + " but got " + packet_to_string(data)); + } + return Success(); + } + time_provider->advance_time(period); + std::this_thread::sleep_for(std::chrono::nanoseconds(period)); + } + return Unexpected("No packet received after "s + std::to_string(max_attempts) + + " attempts (period=" + std::to_string(period) + "ns)"); +} + class ExpectPacketTimeout final : public EventInTime { const Ipv4 _remote_ip; const Port _remote_port; @@ -369,31 +424,7 @@ class ExpectPacketTimeout final : public EventInTime { UdpSocket& socket, std::shared_ptr ) override { - const auto time_provider = get_time_provider(); - for (size_t attempt = 0; attempt < _max_attempts; ++attempt) { - while (auto result = socket.get_from_send_queue()) { - auto [ip, port, data] = *std::move(result); - if (is_background_packet(data, _data)) { - continue; - } - if (SCU_UNLIKELY(ip != _remote_ip)) { - return Unexpected("Expected remote IP "s + _remote_ip.str() + " but got " + - std::to_string(ip.ip())); - } - if (SCU_UNLIKELY(port != _remote_port)) { - return Unexpected("Expected remote port " + std::to_string(_remote_port) + - " but got " + std::to_string(port)); - } - if (SCU_UNLIKELY(data != _data)) { - return Unexpected("Expected data " + packet_to_string(_data) + " but got " + packet_to_string(data)); - } - return Success(); - } - time_provider->advance_time(_period); - std::this_thread::sleep_for(std::chrono::nanoseconds(_period)); - } - return Unexpected("No packet received after "s + std::to_string(_max_attempts) + - " attempts (period=" + std::to_string(_period) + "ns)"); + return drain_until_match(socket, _remote_ip, _remote_port, _data, _period, _max_attempts); } std::string name() override { return "ExpectPacketTimeout(" + _remote_ip.str() + ":" + std::to_string(_remote_port) + ", " + @@ -815,6 +846,93 @@ class ConnectionHandle final : public std::enable_shared_from_this( new ConnectionIsPanic(shared_from_this(), expect_panic, period, max_attempts)); } + + // Injects a CONNECT/DISCONNECT packet carrying THIS connection's id. The id is + // only known once connect()/accept has run, so it is read from the connection + // at execute time rather than baked into the packet at construction. + class SendIdPacket final : public EventInTime { + friend class ConnectionHandle; + const std::shared_ptr _handle; + const Code::Values _command; + const uint8_t _subcommand; + + SendIdPacket(std::shared_ptr handle, Code::Values command, uint8_t subcommand) + : _handle(std::move(handle)), _command(command), _subcommand(subcommand) { } + +public: + Expected execute( + int64_t, + UdpSocket& socket, + std::shared_ptr + ) override { + SCU_ASSERT(_handle->_connection.has_value(), "Handle does not contain a connection"); + const auto& conn = *(_handle->_connection); + auto packet = generate_single_packet(_command, id_payload(_subcommand, conn->connection_id())); + const Expected info{ { packet.size(), conn->remote_ip(), conn->remote_port() } }; + socket.add_to_receive_queue(info, std::move(packet)); + return Success(); + } + std::string name() override { + return "SendIdPacket(cmd=" + std::to_string(static_cast(_command)) + + ", sub=" + std::to_string(static_cast(_subcommand)) + ")"; + } + ~SendIdPacket() override = default; + }; + + std::unique_ptr send_connect(Code::ConnectSubCommands sub) { + return std::unique_ptr( + new SendIdPacket(shared_from_this(), Code::CONNECT, static_cast(sub))); + } + std::unique_ptr send_disconnect(Code::DisconnectSubCommands sub) { + return std::unique_ptr( + new SendIdPacket(shared_from_this(), Code::DISCONNECT, static_cast(sub))); + } + + // Expects a CONNECT/DISCONNECT packet carrying THIS connection's id, draining + // background retransmits while waiting (same logic as ExpectPacketTimeout). + class ExpectIdPacket final : public EventInTime { + friend class ConnectionHandle; + const std::shared_ptr _handle; + const Code::Values _command; + const uint8_t _subcommand; + const int64_t _period; + const size_t _max_attempts; + + ExpectIdPacket( + std::shared_ptr handle, Code::Values command, uint8_t subcommand, + int64_t period, size_t max_attempts) + : _handle(std::move(handle)), _command(command), _subcommand(subcommand), + _period(period), _max_attempts(max_attempts) { } + +public: + Expected execute( + int64_t, + UdpSocket& socket, + std::shared_ptr + ) override { + SCU_ASSERT(_handle->_connection.has_value(), "Handle does not contain a connection"); + const auto& conn = *(_handle->_connection); + const auto expected = generate_single_packet(_command, id_payload(_subcommand, conn->connection_id())); + return drain_until_match(socket, conn->remote_ip(), conn->remote_port(), + expected, _period, _max_attempts); + } + std::string name() override { + return "ExpectIdPacket(cmd=" + std::to_string(static_cast(_command)) + + ", sub=" + std::to_string(static_cast(_subcommand)) + ")"; + } + ~ExpectIdPacket() override = default; + }; + + std::unique_ptr expect_connect( + Code::ConnectSubCommands sub, int64_t period = TICK_TIME, size_t max_attempts = 20) { + return std::unique_ptr( + new ExpectIdPacket(shared_from_this(), Code::CONNECT, static_cast(sub), period, max_attempts)); + } + std::unique_ptr expect_disconnect( + Code::DisconnectSubCommands sub, int64_t period = TICK_TIME, size_t max_attempts = 20) { + return std::unique_ptr( + new ExpectIdPacket(shared_from_this(), Code::DISCONNECT, static_cast(sub), period, max_attempts)); + } }; class StreamHandle final : public std::enable_shared_from_this { @@ -1249,26 +1367,21 @@ auto create_connection(std::vector& events) { std::shared_ptr connection_handle = ConnectionHandle::create(); events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); + events.push_back({ WHERE, 0, connection_handle->expect_connect(Code::ConnectSubCommands::CONNECT) }); + events.push_back({ WHERE, 0, connection_handle->send_connect(Code::ConnectSubCommands::ACCEPTED) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); return connection_handle; } -void close_connection(std::vector& events) { - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::DISCONNECT) })) }); - events.push_back({ WHERE, 0, - std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::ACCEPTED) })) }); +void close_connection(std::vector& events, const std::shared_ptr& connection_handle) { + events.push_back({ WHERE, 0, connection_handle->send_disconnect(Code::DisconnectSubCommands::DISCONNECT) }); + events.push_back({ WHERE, 0, connection_handle->expect_disconnect(Code::DisconnectSubCommands::ACCEPTED) }); } TEST_F(ScorpioUdpTester, connect_and_get_closed) { std::vector events; auto connection_handle = create_connection(events); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1283,23 +1396,20 @@ TEST_F(ScorpioUdpTester, reconnect_same_address_before_disconnect_accept) { // Quick local disconnect. close() emits DISCONNECT/DISCONNECT; drain it so the // later CONNECT expectation does not trip over it. events.push_back({ WHERE, 0, conn1->close_connection(true) }); - events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::DISCONNECT) })) }); + events.push_back({ WHERE, 0, conn1->expect_disconnect(Code::DisconnectSubCommands::DISCONNECT) }); // Reconnect to the SAME ip:port before the peer's DISCONNECT ACCEPT is delivered. + // conn2 gets its own (different) random connection id. auto conn2 = ConnectionHandle::create(); events.push_back({ WHERE, 0, conn2->create_connection(peer, port) }); - events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); - events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); + events.push_back({ WHERE, 0, conn2->expect_connect(Code::ConnectSubCommands::CONNECT) }); + events.push_back({ WHERE, 0, conn2->send_connect(Code::ConnectSubCommands::ACCEPTED) }); events.push_back({ WHERE, 0, conn2->connection_is_alive(true) }); events.push_back({ WHERE, 0, conn1->connection_is_alive(false) }); - // The late DISCONNECT ACCEPT for the old connection finally arrives. It must be - // ignored and must not disturb the new connection. - events.push_back({ WHERE, 0, std::make_unique(peer, port, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::ACCEPTED) })) }); + // The late DISCONNECT ACCEPT for the old connection (conn1's id) finally arrives. + // It must be ignored and must not disturb the new connection. + events.push_back({ WHERE, 0, conn1->send_disconnect(Code::DisconnectSubCommands::ACCEPTED) }); events.push_back({ WHERE, 0, conn2->connection_is_alive(true) }); execute_test(events); @@ -1312,14 +1422,18 @@ TEST_F(ScorpioUdpTester, accept_connection_and_close) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 10001) }); events.push_back({ WHERE, 0, std::make_unique(true) }); events.push_back({ WHERE, TICK_TIME, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, + id_payload(AS_BYTE(Code::ConnectSubCommands::CONNECT), TEST_PEER_CONNECTION_ID))) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ACCEPTED) })) }); + generate_single_packet(Code::CONNECT, + id_payload(AS_BYTE(Code::ConnectSubCommands::ACCEPTED), TEST_PEER_CONNECTION_ID))) }); events.push_back({ WHERE, 0, connection_handle->get_connection(true) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); events.push_back({ WHERE, 0, connection_handle->close_connection(true) }); + // Incoming connection adopts the peer's id, so the local-close DISCONNECT carries it. events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::DISCONNECT) })) }); + generate_single_packet(Code::DISCONNECT, + id_payload(AS_BYTE(Code::DisconnectSubCommands::DISCONNECT), TEST_PEER_CONNECTION_ID))) }); execute_test(events); } @@ -1330,9 +1444,11 @@ TEST_F(ScorpioUdpTester, reject_connection) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 10001) }); events.push_back({ WHERE, 0, std::make_unique(false) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); + generate_single_packet(Code::CONNECT, + id_payload(AS_BYTE(Code::ConnectSubCommands::CONNECT), TEST_PEER_CONNECTION_ID))) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::REJECTED) })) }); + generate_single_packet(Code::CONNECT, + id_payload(AS_BYTE(Code::ConnectSubCommands::REJECTED), TEST_PEER_CONNECTION_ID))) }); execute_test(events); } @@ -1358,10 +1474,8 @@ TEST_F(ScorpioUdpTester, connect_rejected_by_peer) { auto connection_handle = ConnectionHandle::create(); events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::REJECTED) })) }); + events.push_back({ WHERE, 0, connection_handle->expect_connect(Code::ConnectSubCommands::CONNECT) }); + events.push_back({ WHERE, 0, connection_handle->send_connect(Code::ConnectSubCommands::REJECTED) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(false) }); // Explicit local close synchronously joins the connection's processing thread // before the test ends, avoiding a race in scorpio_udp.cpp:1104 where the @@ -1392,7 +1506,7 @@ TEST_F(ScorpioUdpTester, heartbeat_emitted_periodically) { // A bare heartbeat with no streams: just the code byte (HEARTBEAT|FIRST=0x45) events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::HEARTBEAT, { })) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1424,7 +1538,7 @@ TEST_F(ScorpioUdpTester, outgoing_stream_creation_accepted) { std::vector events; auto connection_handle = create_connection(events); auto stream_handle = create_outgoing_reliable_stream(events, connection_handle, 1, 16); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1442,7 +1556,7 @@ TEST_F(ScorpioUdpTester, outgoing_stream_creation_rejected) { generate_single_packet(Code::CREATE_STREAM, create_stream_payload(1, qos, Code::CreateStreamSubCommands::REJECT))) }); events.push_back({ WHERE, 0, stream_handle->stream_is_active(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1459,7 +1573,7 @@ TEST_F(ScorpioUdpTester, incoming_stream_rejected_when_not_auto_accept) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::CREATE_STREAM, create_stream_payload(7, qos, Code::CreateStreamSubCommands::REJECT))) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1487,7 +1601,7 @@ TEST_F(ScorpioUdpTester, stream_close_initiated_by_peer) { generate_single_packet(Code::CLOSE_STREAM, close_stream_payload(1, Code::CloseStreamSubCommands::CLOSED), 0, 1)) }); events.push_back({ WHERE, 0, stream_handle->stream_is_active(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1530,7 +1644,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_receive_in_order) { std::vector payload{ static_cast(0x10 + seq), 0x20, 0x30 }; events.push_back({ WHERE, 0, stream_handle->stream_receive(payload) }); } - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1552,7 +1666,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_receive_out_of_order_is_reordered) { for (SeqNumber seq = 0; seq < 3; ++seq) { events.push_back({ WHERE, 0, stream_handle->stream_receive(payloads[seq]) }); } - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1572,7 +1686,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_duplicate_ignored) { events.push_back({ WHERE, 0, stream_handle->stream_receive(payload1) }); // No further data — the duplicate of seq 0 must have been dropped. events.push_back({ WHERE, 0, stream_handle->stream_expect_no_receive() }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1585,7 +1699,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_send_emits_stream_data) { events.push_back({ WHERE, 0, stream_handle->stream_send(payload) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(1, 0, payload)) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1603,7 +1717,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_send_fragmented_emits_multiple_packets) events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, std::move(packet)) }); } - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1636,7 +1750,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_retransmits_on_heartbeat_gap) { generate_single_packet(Code::HEARTBEAT, hb_body)) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(1, 1, payloads[1])) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1664,7 +1778,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_heartbeat_no_gap_no_retransmission) { // No STREAM_DATA should reappear (heartbeats are ignored by the filter). events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 10, ExpectNoPacket::Filter::STREAM_DATA_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1698,7 +1812,7 @@ TEST_F(ScorpioUdpTester, unreliable_stream_single_packet_delivered_immediately) events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(2, 0, payload)) }); events.push_back({ WHERE, 0, stream_handle->stream_receive(payload) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1720,7 +1834,7 @@ TEST_F(ScorpioUdpTester, unreliable_stream_fragments_reassembled_out_of_order) { packets[idx]) }); } events.push_back({ WHERE, 0, stream_handle->stream_receive(payload) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1738,7 +1852,7 @@ TEST_F(ScorpioUdpTester, unreliable_stream_missing_middle_fragment_not_delivered packets[2]) }); // Stream must not deliver anything. events.push_back({ WHERE, 0, stream_handle->stream_expect_no_receive() }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1787,7 +1901,7 @@ TEST_F(ScorpioUdpTester, two_reliable_streams_independent_data) { events.push_back({ WHERE, 0, stream_a->stream_expect_no_receive() }); events.push_back({ WHERE, 0, stream_b->stream_expect_no_receive() }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1827,7 +1941,7 @@ TEST_F(ScorpioUdpTester, reliable_and_unreliable_concurrent) { events.push_back({ WHERE, 0, rel_stream->stream_receive(rel_payload) }); events.push_back({ WHERE, 0, unrel_stream->stream_receive(unrel_payload) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1851,7 +1965,7 @@ TEST_F(ScorpioUdpTester, heartbeat_nonexistent_stream_sends_already_closed) { write_be16(close_body, static_cast(99)); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::CLOSE_STREAM, close_body, 0, 99)) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1916,7 +2030,7 @@ TEST_F(ScorpioUdpTester, heartbeat_nonexistent_stream_between_retransmit_streams events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(5, 1, payloads_b[1])) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1931,7 +2045,7 @@ TEST_F(ScorpioUdpTester, already_closed_response_closes_active_stream) { generate_single_packet(Code::CLOSE_STREAM, close_stream_payload(1, Code::CloseStreamSubCommands::ALREADY_CLOSED), 0, 1)) }); events.push_back({ WHERE, 0, stream_handle->stream_is_active(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -1958,10 +2072,8 @@ TEST_F(ScorpioUdpTester, connect_handles_already_connected_response) { std::vector events; events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, connection_handle->create_connection(Ipv4(127, 0, 0, 1), 12345) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::CONNECT) })) }); - events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::CONNECT, { AS_BYTE(Code::ConnectSubCommands::ALREADY_CONNECTED) })) }); + events.push_back({ WHERE, 0, connection_handle->expect_connect(Code::ConnectSubCommands::CONNECT) }); + events.push_back({ WHERE, 0, connection_handle->send_connect(Code::ConnectSubCommands::ALREADY_CONNECTED) }); // ExpectPacketTimeout silently drains the CONNECT retransmits via the // is_background_packet filter, so this only succeeds if a HEARTBEAT is // actually emitted (i.e., we reached State::CONNECTED). @@ -2003,7 +2115,7 @@ TEST_F(ScorpioUdpTester, outgoing_stream_rejected_transitions_out_of_creating) { // After the state transition, no further CREATE_STREAM:CREATE retransmits may occur. events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 10, ExpectNoPacket::Filter::CREATE_STREAM_CREATE_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2025,7 +2137,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_fragmented_receive_in_order) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, p) }); } events.push_back({ WHERE, 0, stream_handle->stream_receive(payload) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2048,7 +2160,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_fragmented_receive_out_of_order) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, packets[i]) }); } events.push_back({ WHERE, 0, stream_handle->stream_receive(payload) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2074,7 +2186,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_two_consecutive_fragmented_messages) { events.push_back({ WHERE, 0, stream_handle->stream_receive(payload_a) }); events.push_back({ WHERE, 0, stream_handle->stream_receive(payload_b) }); events.push_back({ WHERE, 0, stream_handle->stream_is_panic(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2089,7 +2201,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_too_new_seq_panics_stream) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(1, 3000, { 0x01, 0x02 })) }); events.push_back({ WHERE, 0, stream_handle->stream_is_panic(true) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2113,7 +2225,7 @@ TEST_F(ScorpioUdpTester, unreliable_stream_inconsistent_frames_left_panics) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, packets[0]) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, packets[1]) }); events.push_back({ WHERE, 0, stream_handle->stream_is_panic(true) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2155,7 +2267,7 @@ TEST_F(ScorpioUdpTester, heartbeat_unknown_stream_truncated_does_not_crash) { generate_single_packet(Code::HEARTBEAT, body)) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); events.push_back({ WHERE, 0, connection_handle->connection_is_panic(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2173,7 +2285,7 @@ TEST_F(ScorpioUdpTester, incoming_stream_data_before_create_is_dropped) { // We should not emit any STREAM_DATA in response. events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 5, ExpectNoPacket::Filter::STREAM_DATA_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2211,7 +2323,7 @@ TEST_F(ScorpioUdpTester, peer_close_stream_idempotent_already_closed) { events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::CLOSE_STREAM, close_stream_payload(1, Code::CloseStreamSubCommands::ALREADY_CLOSED), 0, 1)) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2234,7 +2346,7 @@ TEST_F(ScorpioUdpTester, bare_heartbeat_keeps_connection_alive) { events.push_back({ WHERE, 0, std::make_unique(SCU_UDP_TIMEOUT * 4 / 5) }); events.push_back({ WHERE, 0, connection_handle->connection_is_panic(false) }); events.push_back({ WHERE, 0, connection_handle->connection_is_alive(true) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2263,7 +2375,7 @@ TEST_F(ScorpioUdpTester, heartbeat_for_unreliable_stream_does_not_emit_spurious_ generate_single_packet(Code::HEARTBEAT, hb_body)) }); events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 10, ExpectNoPacket::Filter::CLOSE_STREAM_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2317,7 +2429,7 @@ TEST_F(ScorpioUdpTester, heartbeat_mixed_reliable_unreliable_does_not_corrupt_pa // No CLOSE_STREAM may follow. events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 10, ExpectNoPacket::Filter::CLOSE_STREAM_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2338,7 +2450,7 @@ TEST_F(ScorpioUdpTester, peer_unsupported_qos_create_stream_should_be_rejected) events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::CREATE_STREAM, create_stream_payload(3, unsupported_qos, Code::CreateStreamSubCommands::REJECT))) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2348,10 +2460,11 @@ TEST_F(ScorpioUdpTester, disconnect_to_unknown_peer_responds_already_disconnecte std::vector events; events.push_back({ WHERE, 0, std::make_unique() }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, - generate_single_packet(Code::DISCONNECT, { AS_BYTE(Code::DisconnectSubCommands::DISCONNECT) })) }); + generate_single_packet(Code::DISCONNECT, + id_payload(AS_BYTE(Code::DisconnectSubCommands::DISCONNECT), TEST_PEER_CONNECTION_ID))) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, generate_single_packet(Code::DISCONNECT, - { AS_BYTE(Code::DisconnectSubCommands::ALREADY_DISCONNECTED) })) }); + id_payload(AS_BYTE(Code::DisconnectSubCommands::ALREADY_DISCONNECTED), TEST_PEER_CONNECTION_ID))) }); execute_test(events); } @@ -2391,7 +2504,7 @@ TEST_F(ScorpioUdpTester, outgoing_stream_already_exists_response_keeps_stream_cr events.push_back({ WHERE, 0, stream_handle->stream_is_alive(true) }); events.push_back({ WHERE, 0, stream_handle->stream_is_active(false) }); events.push_back({ WHERE, 0, stream_handle->stream_is_panic(false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2424,7 +2537,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_retransmit_multiple_gaps) { stream_data_packet(1, 1, payloads[1])) }); events.push_back({ WHERE, 0, std::make_unique(Ipv4(127, 0, 0, 1), 12345, stream_data_packet(1, 3, payloads[3])) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2445,7 +2558,7 @@ TEST_F(ScorpioUdpTester, two_unreliable_streams_independent_data) { events.push_back({ WHERE, 0, stream_b->stream_receive(payload_b) }); events.push_back({ WHERE, 0, stream_a->stream_expect_no_receive() }); events.push_back({ WHERE, 0, stream_b->stream_expect_no_receive() }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2467,7 +2580,7 @@ TEST_F(ScorpioUdpTester, reliable_stream_send_after_peer_close_returns_false) { events.push_back({ WHERE, 0, stream_handle->stream_is_active(false) }); events.push_back({ WHERE, 0, stream_handle->stream_send({ 0x01, 0x02, 0x03 }, /*expect_success=*/ false) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } @@ -2488,6 +2601,8 @@ TEST_F(ScorpioUdpTester, unknown_command_byte_is_logged_and_ignored) { ExpectNoPacket::Filter::STREAM_DATA_ONLY) }); events.push_back({ WHERE, 0, std::make_unique(TICK_TIME, 5, ExpectNoPacket::Filter::CLOSE_STREAM_ONLY) }); - close_connection(events); + close_connection(events, connection_handle); execute_test(events); } + +// TODO(@Igor): FIXME - above comments about bugs are stale update them From bfa36c20e3777309cb8f1cda3e32bfbb07f5783b Mon Sep 17 00:00:00 2001 From: igorosky Date: Thu, 18 Jun 2026 09:08:49 +0200 Subject: [PATCH 3/4] Linting & comments update Signed-off-by: igorosky --- src/network/scorpio_udp.cpp | 4 +- test/network/test_scorpio_udp_framework.cpp | 84 ++++++++------------- 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/src/network/scorpio_udp.cpp b/src/network/scorpio_udp.cpp index 34d69b1..29318ca 100644 --- a/src/network/scorpio_udp.cpp +++ b/src/network/scorpio_udp.cpp @@ -493,8 +493,8 @@ void ScorpioUdp::handle_connect_packet(const MessageHeader& header, const UdpDat SCU_LOG_ERROR(_logger, "Received ALREADY_CONNECTED for non-existing connection ip: {}, port: {}", udp_data.ip.str(), udp_data.port); } else if (connection->connection_id() != connection_id) { - SCU_LOG_ERROR(_logger, "Received ALREADY_CONNECTED for connection with different connection_id. ip: {}, port: {}, " - "existing connection_id: {}, received connection_id: {}", + SCU_LOG_ERROR(_logger, "Received ALREADY_CONNECTED for connection with different connection_id. " + "ip: {}, port: {}, existing connection_id: {}, received connection_id: {}", udp_data.ip.str(), udp_data.port, connection->connection_id(), connection_id); } else if (connection->_state.compare_exchange_strong(expected, ScorpioUdpConnection::State::CONNECTED, std::memory_order_relaxed, std::memory_order_relaxed)) { diff --git a/test/network/test_scorpio_udp_framework.cpp b/test/network/test_scorpio_udp_framework.cpp index 7651907..9ae9994 100644 --- a/test/network/test_scorpio_udp_framework.cpp +++ b/test/network/test_scorpio_udp_framework.cpp @@ -2050,23 +2050,18 @@ TEST_F(ScorpioUdpTester, already_closed_response_closes_active_stream) { } // ===================================================================== -// Suite 8 — bug-hunt and additional coverage +// Suite 8 — regression tests and additional coverage // -// Tests in this suite either compromise a suspected bug in -// src/network/scorpio_udp.cpp (those are EXPECTED to fail with the current -// implementation and serve as living bug reports) or fill obvious coverage -// gaps. See /home/igor/.claude/plans/try-to-find-bugs-composed-puffin.md -// for the bug report. +// Tests in this suite were originally written as bug reports for +// src/network/scorpio_udp.cpp. All bugs have since been fixed; these +// tests now serve as regression coverage. // ===================================================================== -// Compromises bug 1: handle_connect_packet's ALREADY_CONNECTED branch -// (scorpio_udp.cpp:450-457) performs no state transition, so the -// connection's processing_thread stays in its CONNECTING-retransmit loop -// (scorpio_udp.cpp:1059-1064) forever. A correctly-fixed implementation -// should treat peer's ALREADY_CONNECTED as confirmation and transition to -// CONNECTED, after which the heartbeat loop starts emitting HEARTBEAT -// packets. We assert that a HEARTBEAT is observed; with the bug present -// no HEARTBEAT is ever sent and ExpectPacketTimeout will time out. +// Regression for bug 1: handle_connect_packet's ALREADY_CONNECTED branch +// (scorpio_udp.cpp:450-457) was not transitioning to CONNECTED, leaving +// the connection in its CONNECTING-retransmit loop. Fix: treat peer's +// ALREADY_CONNECTED as confirmation, transition to CONNECTED, and start +// the heartbeat loop. Test asserts a HEARTBEAT is observed. TEST_F(ScorpioUdpTester, connect_handles_already_connected_response) { auto connection_handle = ConnectionHandle::create(); std::vector events; @@ -2083,13 +2078,11 @@ TEST_F(ScorpioUdpTester, connect_handles_already_connected_response) { execute_test(events); } -// Compromises bug 2: handle_connect_packet's CREATE_STREAM:REJECT handler -// (scorpio_udp.cpp:832-866) never transitions the local stream out of -// CREATING. As a result update() (scorpio_udp.cpp:1357-1366) keeps emitting -// CREATE_STREAM:CREATE on every heartbeat tick until SCU_UDP_CREATE_RETRY_ -// PERIOD elapses (5 s simulated). A fix should transition to REJECTED or -// ERROR immediately and stop retransmitting. We assert no CREATE_STREAM: -// CREATE is observed after the REJECT response. +// Regression for bug 2: handle_connect_packet's CREATE_STREAM:REJECT handler +// (scorpio_udp.cpp:832-866) was not transitioning the local stream out of +// CREATING, causing repeated CREATE_STREAM:CREATE retransmits until the +// retry timeout. Fix: transition to REJECTED immediately on REJECT response. +// Test asserts no CREATE_STREAM:CREATE follows the REJECT. TEST_F(ScorpioUdpTester, outgoing_stream_rejected_transitions_out_of_creating) { std::vector events; auto connection_handle = create_connection(events); @@ -2248,13 +2241,10 @@ TEST_F(ScorpioUdpTester, stream_send_after_close_returns_false) { execute_test(events); } -// Documents bug 4: heartbeat_packet_handler reads data[pos++] without -// bounds-checking when a stream listed in the heartbeat is not known -// locally (scorpio_udp.cpp:937-950). This test feeds a malformed heartbeat -// body that ends mid-field; the connection should not panic. In practice -// std::vector's allocator slack means the OOB read returns junk rather -// than segfaulting, but the bug is real and should be fixed with an -// explicit pos < data.size() check. +// Regression for bug 4: heartbeat_packet_handler read data[pos++] without +// bounds-checking when a stream in the heartbeat was not known locally +// (scorpio_udp.cpp:937-950). Fix: explicit pos < data.size() check. +// Test feeds a truncated heartbeat body and asserts no crash/panic. TEST_F(ScorpioUdpTester, heartbeat_unknown_stream_truncated_does_not_crash) { std::vector events; auto connection_handle = create_connection(events); @@ -2351,20 +2341,15 @@ TEST_F(ScorpioUdpTester, bare_heartbeat_keeps_connection_alive) { } // ===================================================================== -// Suite 9 - additional bug-hunt and coverage. See plan file -// try-to-find-bugs-drifting-firefly.md (under ~/.claude/plans) for the -// full bug report. Bug numbers below continue the numbering from Suite 8. +// Suite 9 — regression tests and additional coverage. +// Bug numbers continue from Suite 8; all fixed. // ===================================================================== -// Compromises bug 5: heartbeat_packet_handler at scorpio_udp.cpp:937-955 -// delegates per-stream-entry parsing to ScorpioUdpStream::handle_heartbeat_ -// data, but that function returns early without advancing `pos` when the -// stream is unreliable (scorpio_udp.cpp:1589-1591). The outer loop then -// reads the next stream_num from the middle of the unreliable entry's body -// (range_count byte + first byte of initial_end) and emits CLOSE_STREAM -// for whatever stream id those misaligned bytes decode to. With an all- -// zero range body the spurious target is stream 0. A correctly-fixed -// implementation must not emit a CLOSE_STREAM for any unrelated stream. +// Regression for bug 5: heartbeat_packet_handler delegated to +// handle_heartbeat_data but did not advance `pos` for unreliable streams +// (scorpio_udp.cpp:1589-1591), causing the outer loop to misread the next +// stream_num and emit a spurious CLOSE_STREAM. Fix: always advance pos +// after each stream entry. Test asserts no CLOSE_STREAM is emitted. TEST_F(ScorpioUdpTester, heartbeat_for_unreliable_stream_does_not_emit_spurious_close) { std::vector events; auto connection_handle = create_connection(events); @@ -2379,11 +2364,9 @@ TEST_F(ScorpioUdpTester, heartbeat_for_unreliable_stream_does_not_emit_spurious_ execute_test(events); } -// Compromises bug 5 in a multi-entry heartbeat. A reliable stream entry -// with a single gap is followed by an unreliable stream entry. The -// reliable retransmit (seq 1) is correct; after that, the parser is -// misaligned and emits a CLOSE_STREAM for some stream id decoded from -// the unreliable entry's body. No CLOSE_STREAM is acceptable. +// Regression for bug 5 in a multi-entry heartbeat: reliable then unreliable +// stream. The misaligned pos after the unreliable entry caused a spurious +// CLOSE_STREAM. Test asserts no CLOSE_STREAM follows the correct retransmit. TEST_F(ScorpioUdpTester, heartbeat_mixed_reliable_unreliable_does_not_corrupt_parsing) { std::vector events; auto connection_handle = create_connection(events); @@ -2433,11 +2416,10 @@ TEST_F(ScorpioUdpTester, heartbeat_mixed_reliable_unreliable_does_not_corrupt_pa execute_test(events); } -// Compromises bug 12: incoming CREATE_STREAM with an unsupported QoS -// (UNRELIABLE_LATEST_ONLY or RELIABLE_UNORDERED) is silently accepted at -// scorpio_udp.cpp:766-769 even though create_stream() asserts is_supported -// for the outgoing path. A correctly-fixed implementation must respond -// with REJECT for an unsupported reliability mode. +// Regression for bug 12: incoming CREATE_STREAM with unsupported QoS +// (UNRELIABLE_LATEST_ONLY or RELIABLE_UNORDERED) was silently accepted +// (scorpio_udp.cpp:766-769). Fix: respond with REJECT for unsupported modes. +// Test asserts a REJECT is sent. TEST_F(ScorpioUdpTester, peer_unsupported_qos_create_stream_should_be_rejected) { std::vector events; auto connection_handle = create_connection(events); @@ -2604,5 +2586,3 @@ TEST_F(ScorpioUdpTester, unknown_command_byte_is_logged_and_ignored) { close_connection(events, connection_handle); execute_test(events); } - -// TODO(@Igor): FIXME - above comments about bugs are stale update them From a14b96b88795fe27ccd0af1dc22388cd879dcf7f Mon Sep 17 00:00:00 2001 From: igorosky Date: Thu, 18 Jun 2026 21:07:52 +0200 Subject: [PATCH 4/4] -fsanitize Signed-off-by: igorosky --- CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b69d2b6..4fa810a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -256,7 +256,8 @@ if(BUILD_TESTING) -DSCU_LOG_LEVEL=5 ) target_include_directories(network_framework_test PRIVATE SYSTEM ${GMOCK_INCLUDE_DIRS}) - target_link_libraries(network_framework_test ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES}) + target_link_libraries(network_framework_test -fsanitize=address ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES}) + target_compile_options(network_framework_test PRIVATE -fsanitize=address -fno-omit-frame-pointer) target_link_libraries(test_scorpio_udp_framework buffered_file_saver