Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ if(BUILD_TESTING)
ament_target_dependencies(${TEST_NAME} rclcpp std_msgs)
endforeach()

# Real-socket integration suite runs 6 sleep-dominated cases sequentially (~60s),
# right on the ament_add_gtest 60s default. Give it a generous budget.
set_tests_properties(test_scorpio_udp PROPERTIES TIMEOUT 180)

add_library(network_mock STATIC
test/network/mock_udp.cpp
src/network/ip.cpp
Expand Down Expand Up @@ -244,7 +248,7 @@ if(BUILD_TESTING)
)

# Test of scorpio_udp with framework
ament_add_gtest(test_scorpio_udp_framework test/network/test_scorpio_udp_framework.cpp)
ament_add_gtest(test_scorpio_udp_framework test/network/test_scorpio_udp_framework.cpp TIMEOUT 180)

add_library(network_framework_test STATIC
src/network/ip.cpp
Expand All @@ -256,8 +260,8 @@ if(BUILD_TESTING)
-DSCU_LOG_LEVEL=5
)
target_include_directories(network_framework_test PRIVATE SYSTEM ${GMOCK_INCLUDE_DIRS})
target_link_libraries(network_framework_test -fsanitize=address ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES})
target_compile_options(network_framework_test PRIVATE -fsanitize=address -fno-omit-frame-pointer)
target_link_libraries(network_framework_test -fsanitize=address,undefined ${GMOCK_MAIN_LIBRARIES} ${GMOCK_LIBRARIES})
target_compile_options(network_framework_test PRIVATE -fsanitize=address,undefined -fno-omit-frame-pointer)

target_link_libraries(test_scorpio_udp_framework
buffered_file_saver
Expand Down
19 changes: 17 additions & 2 deletions include/scorpio_utils/network/scorpio_udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class ScorpioUdpStream : public std::enable_shared_from_this<ScorpioUdpStream> {
bool append_heartbeat_data(std::vector<uint8_t>& heartbeat_data) const;
void handle_heartbeat_data(const std::vector<uint8_t>& data, size_t& pos);
void remove_expired_unreliable_data();
void activate_stream();

public:
SCU_ALWAYS_INLINE bool SCU_EAGER_SELECT_IS_READY() noexcept {
Expand Down Expand Up @@ -355,6 +356,8 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
std::atomic<size_t> _received_packet_count;
std::atomic<int64_t> _last_received_heartbeat_time;
std::atomic<size_t> _received_heartbeat_count;
std::atomic<size_t> _send_heartbeat_count;
std::atomic<size_t> _send_partial_heartbeat_count;
std::mutex _panic_mutex;
threading::Signal _start_signal;
std::shared_ptr<logger::Logger> _logger;
Expand All @@ -365,8 +368,14 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
threading::Channel<std::shared_ptr<ScorpioUdpStream>, 1024> _new_streams;
threading::Channel<std::pair<MessageHeader, UdpData>, 1024 * 1024> _incoming_packets;
threading::Channel<std::shared_ptr<ScorpioUdpStream>, 1024> _awaiting_streams;
std::array<std::weak_ptr<ScorpioUdpStream>, std::numeric_limits<StreamNumber>::max() + 1> _streams;
std::array<std::atomic<bool>, std::numeric_limits<StreamNumber>::max() + 1> _stream_exists;
constexpr static size_t max_streams_count = std::numeric_limits<StreamNumber>::max() + 1;
// The fact that there is a second level of stream mask makes stream removal impossible
// to do without mutual exclusivity
std::mutex _streams_mask_write_mutex;
std::array<std::atomic<bool>, max_streams_count> _stream_exists;
std::array<std::atomic<uint64_t>, max_streams_count / (64 * 64)> _streams_mask_level_2;
std::array<std::atomic<uint64_t>, max_streams_count / 64> _streams_mask;
std::array<std::weak_ptr<ScorpioUdpStream>, max_streams_count> _streams;
std::thread _processing_thread;
bool connected();
std::shared_ptr<ScorpioUdpStream> get_stream(StreamNumber);
Expand Down Expand Up @@ -474,6 +483,12 @@ class ScorpioUdpConnection : public std::enable_shared_from_this<ScorpioUdpConne
SCU_ALWAYS_INLINE auto received_heartbeat_count() const noexcept {
return _received_heartbeat_count.load(std::memory_order_relaxed);
}
SCU_ALWAYS_INLINE auto send_partial_heartbeat_count() const noexcept {
return _send_partial_heartbeat_count.load(std::memory_order_relaxed);
}
SCU_ALWAYS_INLINE auto send_heartbeat_count() const noexcept {
return _send_heartbeat_count.load(std::memory_order_relaxed);
}
SCU_ALWAYS_INLINE auto retransmission_count() const noexcept {
return _retransmission_count.load(std::memory_order_relaxed);
}
Expand Down
4 changes: 2 additions & 2 deletions include/scorpio_utils/network/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ SCU_ALWAYS_INLINE void host_to_network(const S* src, T* target, size_t size) {

template<typename T>
bool network_to_host(const std::vector<uint8_t>& src, T* target, size_t& pos) {
if (pos + sizeof(T) > src.size()) {
if (SCU_UNLIKELY(pos + sizeof(T) > src.size())) {
return false;
}
network_to_host(src.data() + pos, target, sizeof(T));
Expand All @@ -50,7 +50,7 @@ bool network_to_host(const std::vector<uint8_t>& src, T* target, size_t& pos) {

template<typename T>
bool host_to_network(T src, std::vector<uint8_t>& target, size_t& pos) {
if (pos + sizeof(T) > target.size()) {
if (SCU_UNLIKELY(pos + sizeof(T) > target.size())) {
return false;
}
host_to_network<T>(&src, target.data() + pos, sizeof(T));
Expand Down
Loading
Loading