Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,8 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
IPropertyTree * pageCache = topology->queryPropTree("pageCache");

bool isBatchRoxie = strisame(roxieMode, "batch");
useTcpTransport = strisame(protocol, "tcp");
useTcpTransport = strisame(protocol, "tcp") || strisame(protocol, "uds");
useUdsTransport = strisame(protocol, "uds");
bool usingRemoteStorage = (pageCache != nullptr);

// --- These options have the following effects on the defaults:
Expand Down
6 changes: 3 additions & 3 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class RoxieTcpListener : public CSocketConnectionListener
{
public:
RoxieTcpListener(IRoxieWorkerRequestReceiver & _receiver)
: CSocketConnectionListener(0, false, 0, 0), receiver(_receiver)
: CSocketConnectionListener(0, false, 0, 0, true, useUdsTransport), receiver(_receiver)
{
}

Expand Down Expand Up @@ -466,7 +466,7 @@ class RoxieTcpListener : public CSocketConnectionListener
class RoxieTcpWorkerCommunicator : public CInterfaceOf<IRoxieWorkerCommunicator>
{
public:
RoxieTcpWorkerCommunicator() : sender(true) {}
RoxieTcpWorkerCommunicator() : sender(true, useUdsTransport) {}

virtual size32_t queryMaxPacketSize() const override
{
Expand All @@ -478,7 +478,7 @@ class RoxieTcpWorkerCommunicator : public CInterfaceOf<IRoxieWorkerCommunicator>
assertex(!running);
running = true;
listener.reset(new RoxieTcpListener(_receiver));
listener->startPort(ccdMulticastPort);
listener->startPort(ccdMulticastPort, useUdsTransport);
}

virtual void stopListening() override
Expand Down
4 changes: 2 additions & 2 deletions roxie/udplib/tcptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class CTcpReceiveManager : implements IReceiveManager, public CInterface
{
public:
PacketListener(CTcpReceiveManager & _receiver)
: CSocketConnectionListener(0, false, 0, 0), receiver(_receiver)
: CSocketConnectionListener(0, false, 0, 0, true, useUdsTransport), receiver(_receiver)
{
maxInitialReadSize = roxiemem::DATA_ALIGNMENT_SIZE * 4;
}
Expand Down Expand Up @@ -150,7 +150,7 @@ class CTcpReceiveManager : implements IReceiveManager, public CInterface
udpBufferManager = bufferManager; // ugly global variable...
if (!collateDirectly)
collatorThread.start(false);
listener.startPort(data_port);
listener.startPort(data_port, useUdsTransport);
}

~CTcpReceiveManager()
Expand Down
2 changes: 1 addition & 1 deletion roxie/udplib/tcptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using roxiemem::DataBuffer;
class DataBufferTcpSender : public CTcpSender
{
public:
DataBufferTcpSender(bool _lowLatency) : CTcpSender(_lowLatency) {}
DataBufferTcpSender(bool _lowLatency) : CTcpSender(_lowLatency, useUdsTransport) {}

protected:
virtual void releaseBuffer(void * buffer) override
Expand Down
1 change: 1 addition & 0 deletions roxie/udplib/udplib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ extern UDPLIB_API bool udpEncryptOnSendThread;
//Should be in ccd
extern UDPLIB_API unsigned multicastTTL;
extern UDPLIB_API bool useTcpTransport;
extern UDPLIB_API bool useUdsTransport;

// -- Reported metrics --------------------------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ bool udpEncryptOnSendThread = false;

unsigned multicastTTL = 1;
bool useTcpTransport = false;
bool useUdsTransport = false;

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
Expand Down
119 changes: 102 additions & 17 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
Expand Down Expand Up @@ -385,7 +386,7 @@ struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our o
#define T_SOCKET int
#define SEND_FLAGS (MSG_NOSIGNAL)
#endif
enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast};
enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast, sm_unix_server, sm_unix };

#define BADSOCKERR(err) ((err==JSE_BADF)||(err==JSE_NOTSOCK))

Expand Down Expand Up @@ -656,7 +657,7 @@ class CSocket: public ISocket, public CInterface
size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size);
void close();
void errclose();
bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); }
bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server)&&(sockmode!=sm_unix)&&(sockmode!=sm_unix_server); }
void shutdown(unsigned mode=SHUTDOWN_READWRITE);
void shutdownNoThrow(unsigned mode);

Expand Down Expand Up @@ -933,6 +934,7 @@ typedef union {
struct sockaddr sa;
struct sockaddr_in6 sin6;
struct sockaddr_in sin;
struct sockaddr_un sun;
} J_SOCKADDR;

#define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
Expand Down Expand Up @@ -966,6 +968,16 @@ inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,co
}


inline socklen_t setUnixSockAddr(J_SOCKADDR &u, const IpAddress &ip, unsigned short port) {
StringBuffer ipStr;
ip.getIpText(ipStr);
ipStr.replace((char) '.', (char) '_');
ipStr.replace((char) ':', (char) '_');
u.sun.sun_family = AF_UNIX;
snprintf(u.sun.sun_path, sizeof(u.sun.sun_path), "/tmp/hpcc_uds_%s_%u.sock", ipStr.str(), port);
return sizeof(u.sun);
}


inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
{
Expand Down Expand Up @@ -1137,7 +1149,7 @@ static bool set_socket_nonblock(T_SOCKET sock, bool nonblocking)
// Static helper: Prepare a socket for connection (shared by sync and async paths)
// Returns the socket descriptor and sockaddr structure
// Throws exception on error
static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned short hostport, J_SOCKADDR & sockaddr, socklen_t & sockaddrlen, const char * tracename)
static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned short hostport, SOCKETMODE sockmode, J_SOCKADDR & sockaddr, socklen_t & sockaddrlen, const char * tracename)
{
if (targetip.isNull())
{
Expand All @@ -1148,8 +1160,26 @@ static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned
}

memset(&sockaddr, 0, sizeof(J_SOCKADDR));
sockaddrlen = setSockAddr(sockaddr, targetip, hostport);
T_SOCKET sock = ::socket(sockaddr.sa.sa_family, SOCK_STREAM, targetip.isIp4() ? 0 : PF_INET6);
#ifndef _WIN32
if (sockmode == sm_unix_server || sockmode == sm_unix) {
sockaddrlen = setUnixSockAddr(sockaddr, targetip, hostport);
}
else
#endif
{
sockaddrlen = setSockAddr(sockaddr, targetip, hostport);
}

T_SOCKET sock;
#ifndef _WIN32
if (sockmode == sm_unix_server || sockmode == sm_unix) {
sock = ::socket(AF_UNIX, SOCK_STREAM, 0);
}
else
#endif
{
sock = ::socket(sockaddr.sa.sa_family, SOCK_STREAM, targetip.isIp4() ? 0 : PF_INET6);
}
if (sock == INVALID_SOCKET)
{
int err = SOCKETERRNO();
Expand Down Expand Up @@ -1237,7 +1267,7 @@ int CSocket::pre_connect(bool block)
{
DEFINE_SOCKADDR(u);
socklen_t ul;
sock = prepare_socket_for_connect(targetip, hostport, u, ul, tracename);
sock = prepare_socket_for_connect(targetip, hostport, sockmode, u, ul, tracename);
owned = true;
state = ss_pre_open; // will be set to open by post_connect

Expand Down Expand Up @@ -1271,7 +1301,7 @@ void CSocket::prepareForAsyncConnect(struct sockaddr *& addr, size32_t & addrlen
// Prepare the socket for async connection
DEFINE_SOCKADDR(u);
socklen_t ul;
sock = prepare_socket_for_connect(targetip, hostport, u, ul, tracename);
sock = prepare_socket_for_connect(targetip, hostport, sockmode, u, ul, tracename);
owned = true;
state = ss_pre_open; // will be set to open by finishAsyncConnect

Expand Down Expand Up @@ -1330,10 +1360,19 @@ void CSocket::open(int listen_queue_size,bool reuseports)
// This is used when a unique IP:port is needed for MP client
// INode/IGroup internals, but client never actually accepts connections.

if (IP6preferred)
sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
#ifndef _WIN32
if (sockmode == sm_unix_server || sockmode == sm_unix) {
sock = ::socket(AF_UNIX, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
}
else
sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
#endif
{
if (IP6preferred)
sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
else
sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
}

if (sock == INVALID_SOCKET) {
THROWJSOCKTARGETEXCEPTION(SOCKETERRNO());
}
Expand Down Expand Up @@ -1366,11 +1405,22 @@ void CSocket::open(int listen_queue_size,bool reuseports)

DEFINE_SOCKADDR(u);
socklen_t ul;
if (!targetip.isNull()) {
ul = setSockAddr(u,targetip,hostport);
#ifndef _WIN32
if (sockmode == sm_unix_server) {
IpAddress bindIp;
if (targetip.isNull()) GetHostIp(bindIp); else bindIp = targetip;
ul = setUnixSockAddr(u, bindIp, hostport);
::unlink(u.sun.sun_path); // MUST clean up previous dead file before bind
}
else
#endif
{
if (!targetip.isNull()) {
ul = setSockAddr(u,targetip,hostport);
}
else
ul = setSockAddrAny(u,hostport);
}
else
ul = setSockAddrAny(u,hostport);
int saverr;
if (::bind(sock, &u.sa, ul) != 0) {
saverr = SOCKETERRNO();
Expand Down Expand Up @@ -1494,7 +1544,17 @@ ISocket* CSocket::accept(bool allowcancel)
}

SocketEndpoint peerEp;
getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, peerEp);
#ifndef _WIN32
if (peerSockAddr.sa.sa_family == AF_UNIX) {
// dummy up peer IP so logging logic doesn't crash on AF_UNIX structures
peerEp.set("127.0.0.1", 1);
}
else
#endif
{
getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, peerEp);
}

CSocket *ret = new CSocket(newsock,sm_tcp,true,&peerEp);
ret->checkCfgKeepAlive();
ret->set_inherit(false);
Expand Down Expand Up @@ -2017,12 +2077,12 @@ ISocket * ISocket::connect_wait( const SocketEndpoint & ep, unsigned timems)

// Create a socket prepared for async connection - returns socket descriptor and sockaddr for use with io_uring
// memory for addr is allocated by this function and must be freed -- via free() -- by the caller
ISocket * ISocket::createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen)
ISocket * ISocket::createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen, bool useUDS)
{
if (ep.isNull() || (ep.port==0))
THROWJSOCKEXCEPTION(JSOCKERR_bad_address);

Owned<CSocket> csock = new CSocket(ep, sm_tcp, NULL);
Owned<CSocket> csock = new CSocket(ep, useUDS ? sm_unix : sm_tcp, NULL);
csock->prepareForAsyncConnect(addr, addrlen);
return csock.getClear();
}
Expand Down Expand Up @@ -7748,3 +7808,28 @@ extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket)
e->Release();
}
}

ISocket* ISocket::unix_create(unsigned short port, int listen_queue_size)
{
#ifndef _WIN32
assertex(port != 0); // Need to consider how port 0 fallback works since there are no ephemeral local socket ports out-of-the-box
SocketEndpoint dummyEp;
dummyEp.port = port;
Owned<CSocket> sock = new CSocket(dummyEp, sm_unix_server, NULL);
sock->open(listen_queue_size);
return sock.getClear();
#else
return create(port, listen_queue_size);
#endif
}

ISocket* ISocket::unix_connect(const SocketEndpoint &ep)
{
#ifndef _WIN32
Owned<CSocket> sock = new CSocket(ep, sm_unix, NULL);
sock->connect_wait(DEFAULT_CONNECT_TIME);
return sock.getClear();
#else
return connect(ep);
#endif
}
6 changes: 5 additions & 1 deletion system/jlib/jsocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class jlib_decl ISocket : extends IInterface

// Async connection support for io_uring
//
static ISocket* createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen);
static ISocket* createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen, bool useUDS);
static void completeAsyncConnect(ISocket * socket, int connectResult);


Expand Down Expand Up @@ -333,6 +333,10 @@ class jlib_decl ISocket : extends IInterface
//
static ISocket* attach(int s,bool tcpip=true);

// --- UDS Fast-Path ---
static ISocket* unix_create(unsigned short port, int listen_queue_size = DEFAULT_LISTEN_QUEUE_SIZE);
static ISocket* unix_connect(const SocketEndpoint &ep);

// suppresGCIfMinSize - if true, will suppress graceful close if size_read >= min_size
// This is the default behavior for backwards compatibility.
// Set to false, to allow caller to see graceful close even if size_read >= min_size
Expand Down
21 changes: 14 additions & 7 deletions system/security/securesocket/socketutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void CReadSelectHandler::clearupSocketHandlers()

//---------------------------------------------------------------------------------------------------------------------

CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring)
CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring, bool _useUDS)
: CReadSelectHandler(_inactiveCloseTimeoutMs, _maxListenHandlerSockets, _useIOUring), Thread("CSocketConnectionListener"), useTLS(_useTLS)
{
// Determine accept method based on io_uring availability and configuration
Expand All @@ -438,7 +438,7 @@ CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS
acceptMethod.store(AcceptMethod::SelectThread);

if (port)
startPort(port);
startPort(port, _useUDS);

if (useTLS)
secureContextServer.setown(createSecureSocketContextSecretSrv("local", nullptr, true));
Expand Down Expand Up @@ -484,12 +484,15 @@ bool CSocketConnectionListener::checkSelfDestruct(const void *p,size32_t sz)
return true;
}

void CSocketConnectionListener::startPort(unsigned short port)
void CSocketConnectionListener::startPort(unsigned short port, bool useUDS)
{
if (!listenSocket)
{
unsigned listenQueueSize = 600; // default
listenSocket.setown(ISocket::create(port, listenQueueSize));
if (useUDS)
listenSocket.setown(ISocket::unix_create(port, listenQueueSize));
else
listenSocket.setown(ISocket::create(port, listenQueueSize));
}

AcceptMethod method = acceptMethod.load();
Expand Down Expand Up @@ -806,7 +809,7 @@ void CSocketConnectionListener::onAsyncComplete(int result)
// until it's cancelled or encounters an error
if (aborting.load())
return;

handleAcceptedConnection(result);

// If using single-shot accept (not multishot), we need to re-queue another accept
Expand All @@ -830,7 +833,11 @@ void CSocketTarget::connect()
// Must be called within a critical section....
try
{
socket.setown(ISocket::connect_timeout(ep, 5000));
if (sender.useUDS)
socket.setown(ISocket::unix_connect(ep));
else
socket.setown(ISocket::connect_timeout(ep, 5000));

if (socket)
{
// Keep track of the number of connections - a useful stat, and to distinguish between initial connection and reconnection.
Expand Down Expand Up @@ -987,7 +994,7 @@ void CSocketTarget::startAsyncConnect()
if (enableAsyncConnect && sender.asyncSender)
{
size32_t addrlen = 0;
socket.setown(ISocket::createForAsyncConnect(ep, addr, addrlen));
socket.setown(ISocket::createForAsyncConnect(ep, addr, addrlen, sender.useUDS));
if (socket)
{
// Queue the async connect operation
Expand Down
Loading
Loading