diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 704cebec0e0..10530e09bf3 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -894,7 +894,8 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) IPropertyTree * pageCache = topology->queryPropTree("pageCache"); bool isBatchRoxie = strisame(roxieMode, "batch"); - useTcpTransport = strisame(protocol, "tcp"); + useTcpTransport = strisame(protocol, "tcp") || strisame(protocol, "uds"); + useUdsTransport = strisame(protocol, "uds"); bool usingRemoteStorage = (pageCache != nullptr); // --- These options have the following effects on the defaults: diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index acbfdc32e8d..6dc1ad82556 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -432,7 +432,7 @@ class RoxieTcpListener : public CSocketConnectionListener { public: RoxieTcpListener(IRoxieWorkerRequestReceiver & _receiver) - : CSocketConnectionListener(0, false, 0, 0), receiver(_receiver) + : CSocketConnectionListener(0, false, 0, 0, true, useUdsTransport), receiver(_receiver) { } @@ -466,7 +466,7 @@ class RoxieTcpListener : public CSocketConnectionListener class RoxieTcpWorkerCommunicator : public CInterfaceOf { public: - RoxieTcpWorkerCommunicator() : sender(true) {} + RoxieTcpWorkerCommunicator() : sender(true, useUdsTransport) {} virtual size32_t queryMaxPacketSize() const override { @@ -478,7 +478,7 @@ class RoxieTcpWorkerCommunicator : public CInterfaceOf assertex(!running); running = true; listener.reset(new RoxieTcpListener(_receiver)); - listener->startPort(ccdMulticastPort); + listener->startPort(ccdMulticastPort, useUdsTransport); } virtual void stopListening() override diff --git a/roxie/udplib/tcptrr.cpp b/roxie/udplib/tcptrr.cpp index 8422595f107..8f0b7a578de 100644 --- a/roxie/udplib/tcptrr.cpp +++ b/roxie/udplib/tcptrr.cpp @@ -107,7 +107,7 @@ class CTcpReceiveManager : implements IReceiveManager, public CInterface { public: PacketListener(CTcpReceiveManager & _receiver) - : CSocketConnectionListener(0, false, 0, 0), receiver(_receiver) + : CSocketConnectionListener(0, false, 0, 0, true, useUdsTransport), receiver(_receiver) { maxInitialReadSize = roxiemem::DATA_ALIGNMENT_SIZE * 4; } @@ -150,7 +150,7 @@ class CTcpReceiveManager : implements IReceiveManager, public CInterface udpBufferManager = bufferManager; // ugly global variable... if (!collateDirectly) collatorThread.start(false); - listener.startPort(data_port); + listener.startPort(data_port, useUdsTransport); } ~CTcpReceiveManager() diff --git a/roxie/udplib/tcptrs.cpp b/roxie/udplib/tcptrs.cpp index cb2dcaf9e01..ee3289a2b7f 100644 --- a/roxie/udplib/tcptrs.cpp +++ b/roxie/udplib/tcptrs.cpp @@ -44,7 +44,7 @@ using roxiemem::DataBuffer; class DataBufferTcpSender : public CTcpSender { public: - DataBufferTcpSender(bool _lowLatency) : CTcpSender(_lowLatency) {} + DataBufferTcpSender(bool _lowLatency) : CTcpSender(_lowLatency, useUdsTransport) {} protected: virtual void releaseBuffer(void * buffer) override diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 9040f252dc0..273f7563ee4 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -229,6 +229,7 @@ extern UDPLIB_API bool udpEncryptOnSendThread; //Should be in ccd extern UDPLIB_API unsigned multicastTTL; extern UDPLIB_API bool useTcpTransport; +extern UDPLIB_API bool useUdsTransport; // -- Reported metrics -------------------------------------------------------------------------- diff --git a/roxie/udplib/udpsha.cpp b/roxie/udplib/udpsha.cpp index fdcb11f3d76..2ebc6d764fe 100644 --- a/roxie/udplib/udpsha.cpp +++ b/roxie/udplib/udpsha.cpp @@ -80,6 +80,7 @@ bool udpEncryptOnSendThread = false; unsigned multicastTTL = 1; bool useTcpTransport = false; +bool useUdsTransport = false; MODULE_INIT(INIT_PRIORITY_STANDARD) { diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index cef553b4abb..d959d34f9da 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -64,6 +64,7 @@ #else #include #include +#include #include #include #include @@ -385,7 +386,7 @@ struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our o #define T_SOCKET int #define SEND_FLAGS (MSG_NOSIGNAL) #endif -enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast}; +enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast, sm_unix_server, sm_unix }; #define BADSOCKERR(err) ((err==JSE_BADF)||(err==JSE_NOTSOCK)) @@ -656,7 +657,7 @@ class CSocket: public ISocket, public CInterface size32_t udp_write_to(const SocketEndpoint &ep,void const* buf, size32_t size); void close(); void errclose(); - bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); } + bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server)&&(sockmode!=sm_unix)&&(sockmode!=sm_unix_server); } void shutdown(unsigned mode=SHUTDOWN_READWRITE); void shutdownNoThrow(unsigned mode); @@ -933,6 +934,7 @@ typedef union { struct sockaddr sa; struct sockaddr_in6 sin6; struct sockaddr_in sin; + struct sockaddr_un sun; } J_SOCKADDR; #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR)) @@ -966,6 +968,16 @@ inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,co } +inline socklen_t setUnixSockAddr(J_SOCKADDR &u, const IpAddress &ip, unsigned short port) { + StringBuffer ipStr; + ip.getIpText(ipStr); + ipStr.replace((char) '.', (char) '_'); + ipStr.replace((char) ':', (char) '_'); + u.sun.sun_family = AF_UNIX; + snprintf(u.sun.sun_path, sizeof(u.sun.sun_path), "/tmp/hpcc_uds_%s_%u.sock", ipStr.str(), port); + return sizeof(u.sun); +} + inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port) { @@ -1137,7 +1149,7 @@ static bool set_socket_nonblock(T_SOCKET sock, bool nonblocking) // Static helper: Prepare a socket for connection (shared by sync and async paths) // Returns the socket descriptor and sockaddr structure // Throws exception on error -static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned short hostport, J_SOCKADDR & sockaddr, socklen_t & sockaddrlen, const char * tracename) +static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned short hostport, SOCKETMODE sockmode, J_SOCKADDR & sockaddr, socklen_t & sockaddrlen, const char * tracename) { if (targetip.isNull()) { @@ -1148,8 +1160,26 @@ static T_SOCKET prepare_socket_for_connect(const IpAddress & targetip, unsigned } memset(&sockaddr, 0, sizeof(J_SOCKADDR)); - sockaddrlen = setSockAddr(sockaddr, targetip, hostport); - T_SOCKET sock = ::socket(sockaddr.sa.sa_family, SOCK_STREAM, targetip.isIp4() ? 0 : PF_INET6); +#ifndef _WIN32 + if (sockmode == sm_unix_server || sockmode == sm_unix) { + sockaddrlen = setUnixSockAddr(sockaddr, targetip, hostport); + } + else +#endif + { + sockaddrlen = setSockAddr(sockaddr, targetip, hostport); + } + + T_SOCKET sock; +#ifndef _WIN32 + if (sockmode == sm_unix_server || sockmode == sm_unix) { + sock = ::socket(AF_UNIX, SOCK_STREAM, 0); + } + else +#endif + { + sock = ::socket(sockaddr.sa.sa_family, SOCK_STREAM, targetip.isIp4() ? 0 : PF_INET6); + } if (sock == INVALID_SOCKET) { int err = SOCKETERRNO(); @@ -1237,7 +1267,7 @@ int CSocket::pre_connect(bool block) { DEFINE_SOCKADDR(u); socklen_t ul; - sock = prepare_socket_for_connect(targetip, hostport, u, ul, tracename); + sock = prepare_socket_for_connect(targetip, hostport, sockmode, u, ul, tracename); owned = true; state = ss_pre_open; // will be set to open by post_connect @@ -1271,7 +1301,7 @@ void CSocket::prepareForAsyncConnect(struct sockaddr *& addr, size32_t & addrlen // Prepare the socket for async connection DEFINE_SOCKADDR(u); socklen_t ul; - sock = prepare_socket_for_connect(targetip, hostport, u, ul, tracename); + sock = prepare_socket_for_connect(targetip, hostport, sockmode, u, ul, tracename); owned = true; state = ss_pre_open; // will be set to open by finishAsyncConnect @@ -1330,10 +1360,19 @@ void CSocket::open(int listen_queue_size,bool reuseports) // This is used when a unique IP:port is needed for MP client // INode/IGroup internals, but client never actually accepts connections. - if (IP6preferred) - sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6); +#ifndef _WIN32 + if (sockmode == sm_unix_server || sockmode == sm_unix) { + sock = ::socket(AF_UNIX, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0); + } else - sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0); +#endif + { + if (IP6preferred) + sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6); + else + sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0); + } + if (sock == INVALID_SOCKET) { THROWJSOCKTARGETEXCEPTION(SOCKETERRNO()); } @@ -1366,11 +1405,22 @@ void CSocket::open(int listen_queue_size,bool reuseports) DEFINE_SOCKADDR(u); socklen_t ul; - if (!targetip.isNull()) { - ul = setSockAddr(u,targetip,hostport); +#ifndef _WIN32 + if (sockmode == sm_unix_server) { + IpAddress bindIp; + if (targetip.isNull()) GetHostIp(bindIp); else bindIp = targetip; + ul = setUnixSockAddr(u, bindIp, hostport); + ::unlink(u.sun.sun_path); // MUST clean up previous dead file before bind + } + else +#endif + { + if (!targetip.isNull()) { + ul = setSockAddr(u,targetip,hostport); + } + else + ul = setSockAddrAny(u,hostport); } - else - ul = setSockAddrAny(u,hostport); int saverr; if (::bind(sock, &u.sa, ul) != 0) { saverr = SOCKETERRNO(); @@ -1494,7 +1544,17 @@ ISocket* CSocket::accept(bool allowcancel) } SocketEndpoint peerEp; - getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, peerEp); +#ifndef _WIN32 + if (peerSockAddr.sa.sa_family == AF_UNIX) { + // dummy up peer IP so logging logic doesn't crash on AF_UNIX structures + peerEp.set("127.0.0.1", 1); + } + else +#endif + { + getSockAddrEndpoint(peerSockAddr, peerSockAddrLen, peerEp); + } + CSocket *ret = new CSocket(newsock,sm_tcp,true,&peerEp); ret->checkCfgKeepAlive(); ret->set_inherit(false); @@ -2017,12 +2077,12 @@ ISocket * ISocket::connect_wait( const SocketEndpoint & ep, unsigned timems) // Create a socket prepared for async connection - returns socket descriptor and sockaddr for use with io_uring // memory for addr is allocated by this function and must be freed -- via free() -- by the caller -ISocket * ISocket::createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen) +ISocket * ISocket::createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen, bool useUDS) { if (ep.isNull() || (ep.port==0)) THROWJSOCKEXCEPTION(JSOCKERR_bad_address); - Owned csock = new CSocket(ep, sm_tcp, NULL); + Owned csock = new CSocket(ep, useUDS ? sm_unix : sm_tcp, NULL); csock->prepareForAsyncConnect(addr, addrlen); return csock.getClear(); } @@ -7748,3 +7808,28 @@ extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket) e->Release(); } } + +ISocket* ISocket::unix_create(unsigned short port, int listen_queue_size) +{ +#ifndef _WIN32 + assertex(port != 0); // Need to consider how port 0 fallback works since there are no ephemeral local socket ports out-of-the-box + SocketEndpoint dummyEp; + dummyEp.port = port; + Owned sock = new CSocket(dummyEp, sm_unix_server, NULL); + sock->open(listen_queue_size); + return sock.getClear(); +#else + return create(port, listen_queue_size); +#endif +} + +ISocket* ISocket::unix_connect(const SocketEndpoint &ep) +{ +#ifndef _WIN32 + Owned sock = new CSocket(ep, sm_unix, NULL); + sock->connect_wait(DEFAULT_CONNECT_TIME); + return sock.getClear(); +#else + return connect(ep); +#endif +} diff --git a/system/jlib/jsocket.hpp b/system/jlib/jsocket.hpp index 6e4013bba80..ec899205a26 100644 --- a/system/jlib/jsocket.hpp +++ b/system/jlib/jsocket.hpp @@ -289,7 +289,7 @@ class jlib_decl ISocket : extends IInterface // Async connection support for io_uring // - static ISocket* createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen); + static ISocket* createForAsyncConnect(const SocketEndpoint & ep, struct sockaddr *& addr, size32_t & addrlen, bool useUDS); static void completeAsyncConnect(ISocket * socket, int connectResult); @@ -333,6 +333,10 @@ class jlib_decl ISocket : extends IInterface // static ISocket* attach(int s,bool tcpip=true); + // --- UDS Fast-Path --- + static ISocket* unix_create(unsigned short port, int listen_queue_size = DEFAULT_LISTEN_QUEUE_SIZE); + static ISocket* unix_connect(const SocketEndpoint &ep); + // suppresGCIfMinSize - if true, will suppress graceful close if size_read >= min_size // This is the default behavior for backwards compatibility. // Set to false, to allow caller to see graceful close even if size_read >= min_size diff --git a/system/security/securesocket/socketutils.cpp b/system/security/securesocket/socketutils.cpp index d74d9303432..79962d9bae1 100644 --- a/system/security/securesocket/socketutils.cpp +++ b/system/security/securesocket/socketutils.cpp @@ -425,7 +425,7 @@ void CReadSelectHandler::clearupSocketHandlers() //--------------------------------------------------------------------------------------------------------------------- -CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring) +CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring, bool _useUDS) : CReadSelectHandler(_inactiveCloseTimeoutMs, _maxListenHandlerSockets, _useIOUring), Thread("CSocketConnectionListener"), useTLS(_useTLS) { // Determine accept method based on io_uring availability and configuration @@ -438,7 +438,7 @@ CSocketConnectionListener::CSocketConnectionListener(unsigned port, bool _useTLS acceptMethod.store(AcceptMethod::SelectThread); if (port) - startPort(port); + startPort(port, _useUDS); if (useTLS) secureContextServer.setown(createSecureSocketContextSecretSrv("local", nullptr, true)); @@ -484,12 +484,15 @@ bool CSocketConnectionListener::checkSelfDestruct(const void *p,size32_t sz) return true; } -void CSocketConnectionListener::startPort(unsigned short port) +void CSocketConnectionListener::startPort(unsigned short port, bool useUDS) { if (!listenSocket) { unsigned listenQueueSize = 600; // default - listenSocket.setown(ISocket::create(port, listenQueueSize)); + if (useUDS) + listenSocket.setown(ISocket::unix_create(port, listenQueueSize)); + else + listenSocket.setown(ISocket::create(port, listenQueueSize)); } AcceptMethod method = acceptMethod.load(); @@ -806,7 +809,7 @@ void CSocketConnectionListener::onAsyncComplete(int result) // until it's cancelled or encounters an error if (aborting.load()) return; - + handleAcceptedConnection(result); // If using single-shot accept (not multishot), we need to re-queue another accept @@ -830,7 +833,11 @@ void CSocketTarget::connect() // Must be called within a critical section.... try { - socket.setown(ISocket::connect_timeout(ep, 5000)); + if (sender.useUDS) + socket.setown(ISocket::unix_connect(ep)); + else + socket.setown(ISocket::connect_timeout(ep, 5000)); + if (socket) { // Keep track of the number of connections - a useful stat, and to distinguish between initial connection and reconnection. @@ -987,7 +994,7 @@ void CSocketTarget::startAsyncConnect() if (enableAsyncConnect && sender.asyncSender) { size32_t addrlen = 0; - socket.setown(ISocket::createForAsyncConnect(ep, addr, addrlen)); + socket.setown(ISocket::createForAsyncConnect(ep, addr, addrlen, sender.useUDS)); if (socket) { // Queue the async connect operation diff --git a/system/security/securesocket/socketutils.hpp b/system/security/securesocket/socketutils.hpp index a288fc1aede..301f084b3cf 100644 --- a/system/security/securesocket/socketutils.hpp +++ b/system/security/securesocket/socketutils.hpp @@ -158,10 +158,10 @@ class SECURESOCKET_API CSocketConnectionListener : protected CReadSelectHandler, public: // _useIOUring: true to enable io_uring multishot accept (if available), false to use traditional thread-based accept // Defaults to true, but can be overridden by expert/@useIOUring configuration setting - CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring = true); + CSocketConnectionListener(unsigned port, bool _useTLS, unsigned _inactiveCloseTimeoutMs, unsigned _maxListenHandlerSockets, bool _useIOUring, bool _useUDS); ~CSocketConnectionListener() override; - void startPort(unsigned short port); + void startPort(unsigned short port, bool useUDS); void stop(); bool checkSelfDestruct(const void *p,size32_t sz); AcceptMethod getAcceptMethod() const { return acceptMethod.load(); } @@ -266,7 +266,7 @@ class SECURESOCKET_API CTcpSender { friend class CSocketTarget; public: - CTcpSender(bool _lowLatency) : lowLatency(_lowLatency) {} + CTcpSender(bool _lowLatency, bool _useUDS) : lowLatency(_lowLatency), useUDS(_useUDS) {} CSocketTarget * queryWorkerSocket(const SocketEndpoint &ep); void setAsyncProcessor(IAsyncProcessor * _asyncSender) { asyncSender.set(_asyncSender); } @@ -279,6 +279,7 @@ class SECURESOCKET_API CTcpSender std::unordered_map, HashSocketEndpoint> workerSockets; Owned asyncSender; const bool lowLatency; + const bool useUDS; }; #endif diff --git a/testing/unittests/multishotaccepttests.cpp b/testing/unittests/multishotaccepttests.cpp index 64ed11019fc..d015311853c 100644 --- a/testing/unittests/multishotaccepttests.cpp +++ b/testing/unittests/multishotaccepttests.cpp @@ -48,7 +48,7 @@ class TestConnectionListener : public CSocketConnectionListener public: TestConnectionListener(unsigned port, bool useTLS = false, bool useIOUring = true) - : CSocketConnectionListener(port, useTLS, 0, 0, useIOUring) + : CSocketConnectionListener(port, useTLS, 0, 0, useIOUring, false) { } diff --git a/testing/unittests/socketasynctests.cpp b/testing/unittests/socketasynctests.cpp index 5d2783e8869..524a8882d0c 100644 --- a/testing/unittests/socketasynctests.cpp +++ b/testing/unittests/socketasynctests.cpp @@ -411,6 +411,8 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture return config.getClear(); } + static constexpr bool useUDS = false; + public: void testSyncConnect() { @@ -484,7 +486,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); ASSERT(socket.get() != nullptr); ASSERT(addrHolder.get() != nullptr); ASSERT(addrlen > 0); @@ -569,7 +571,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture { size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolders[i].getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolders[i].getRef(), addrlen, useUDS); uring->enqueueSocketConnect(socket, addrHolders[i].get(), addrlen, callbacks[i]); sockets.push_back(std::move(socket)); @@ -613,7 +615,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); // Simulate failed async connect try @@ -674,7 +676,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); ReconnectCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback); @@ -696,7 +698,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); ReconnectCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback); @@ -768,7 +770,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); TimeoutCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback); @@ -832,7 +834,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); ReconnectCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback); @@ -989,7 +991,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); RapidCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback); @@ -1051,7 +1053,7 @@ class AsyncSocketConnectionTests : public CppUnit::TestFixture SockAddrHolder addrHolder; size32_t addrlen = 0; - Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen); + Owned socket = ISocket::createForAsyncConnect(ep, addrHolder.getRef(), addrlen, useUDS); CancelCallback callback; uring->enqueueSocketConnect(socket, addrHolder.get(), addrlen, callback);