Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions dc/s2n-quic-dc/src/stream/server/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ use crate::{
};
use core::num::{NonZeroU16, NonZeroUsize};
use s2n_quic_core::ensure;
use std::{io, net::SocketAddr, time::Duration};
use std::{
io,
net::SocketAddr,
os::fd::{AsRawFd, OwnedFd},
sync::Arc,
time::Duration,
};
use tokio::io::unix::AsyncFd;
use tracing::Instrument as _;

Expand Down Expand Up @@ -143,6 +149,7 @@ pub struct Builder {
recv_buffer: Option<usize>,
reuse_addr: Option<bool>,
tls: Option<tcp::tls::Builder>,
attach_reuseport_ebpf: Option<Arc<OwnedFd>>,
}

impl Default for Builder {
Expand All @@ -161,6 +168,7 @@ impl Default for Builder {
recv_buffer: None,
reuse_addr: None,
tls: None,
attach_reuseport_ebpf: None,
}
}
}
Expand Down Expand Up @@ -252,6 +260,11 @@ impl Builder {
self
}

pub fn with_attach_reuseport_ebpf(mut self, fd: Arc<OwnedFd>) -> Self {
self.attach_reuseport_ebpf = Some(fd);
self
}

pub fn build<H: Handshake + Clone, S: event::Subscriber + Clone>(
mut self,
handshake: H,
Expand Down Expand Up @@ -371,6 +384,7 @@ impl Builder {
send_buffer: self.send_buffer,
recv_buffer: self.recv_buffer,
reuse_addr: self.reuse_addr.unwrap_or(false),
attach_reuseport_ebpf: self.attach_reuseport_ebpf,
}
.start()?;

Expand All @@ -392,6 +406,7 @@ struct Start<'a, H: Handshake + Clone, S: event::Subscriber + Clone> {
send_buffer: Option<usize>,
recv_buffer: Option<usize>,
reuse_addr: bool,
attach_reuseport_ebpf: Option<Arc<OwnedFd>>,
}

impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
Expand Down Expand Up @@ -482,7 +497,8 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
options.reuse_address = self.reuse_addr;

// if we have more than one thread then we'll need to use reuse port
if self.concurrency > 1 {
// SO_REUSEPORT is also required for SO_ATTACH_REUSEPORT_EBPF
if self.concurrency > 1 || self.attach_reuseport_ebpf.is_some() {
// if the application is wanting to bind to a random port then we need to set
// reuse_port after
if local_addr.port() == 0 {
Expand Down Expand Up @@ -533,6 +549,27 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
self.server.local_addr = socket.local_addr()?;
}

#[cfg(target_os = "linux")]
if let Some(prog_fd) = self.attach_reuseport_ebpf.as_ref() {
let prog_raw_fd: libc::c_int = prog_fd.as_raw_fd();
// SAFETY: setsockopt with valid socket fd, valid optval pointer,
// and matching optlen for SO_ATTACH_REUSEPORT_EBPF. The BPF
// program fd is borrowed via the Arc<OwnedFd>. The kernel takes
// its own reference on success.
let ret = unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_ATTACH_REUSEPORT_EBPF,
&prog_raw_fd as *const libc::c_int as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
)
};
if ret != 0 {
return Err(io::Error::last_os_error());
}
}

let socket = tokio::io::unix::AsyncFd::new(socket)?;
let id = self.id();
let channel_behavior =
Expand Down
Loading