From d85a81aabdab472cfd16629b65d50202fa708cb1 Mon Sep 17 00:00:00 2001 From: Gifty Edwin Date: Mon, 13 Apr 2026 17:19:55 -0400 Subject: [PATCH] HPCC-XXXXX Add S3 direct API storage plane support Add S3 storage plane support via the file hook mechanism, enabling Thor and Roxie to read/write S3 objects transparently through the IFile/IFileIO interfaces. Core implementation: - S3FileReadIO: read via HTTP GET with Range headers - S3FileWriteIO: buffered writes with automatic PutObject (<8MB) or multipart upload (>=8MB) selection - S3MultipartUpload: manages CreateMultipartUpload/UploadPart/Complete lifecycle with abort-on-destroy safety - S3File: IFile implementation with metadata caching, directory listing via ListObjectsV2 - S3APICopyClient: server-side copy for files <=5GB, multipart copy for larger files - Retry with exponential backoff and jitter for all S3 operations - S3 client cache keyed by storage plane name and device number Platform changes: - Add queryPlaneName() to IStorageApiInfo interface - Add S3 bucket/region/endpoint config to storage plane schema - Add jplane_compat.hpp shim for cross-version jlib compatibility - Update Helm template validation to accept buckets (S3) or containers (Azure) Deployment: - Dockerfile overlay for building S3 hook against stock platform-core - Helm values examples for S3 on EKS with IRSA authentication - Build and deployment guide (helm/examples/s3/README.md) Testing: - ECL integration test verifying putObject vs multipart upload paths - Unit tests for S3 URL validation --- common/remote/hooks/s3/CMakeLists.txt | 5 +- common/remote/hooks/s3/README.md | 66 ++ common/remote/hooks/s3/jplane_compat.hpp | 27 + common/remote/hooks/s3/s3api.cpp | 283 ++++++ common/remote/hooks/s3/s3file.cpp | 1167 +++++++--------------- common/remote/hooks/s3/s3file.hpp | 25 +- common/remote/hooks/s3/s3fileTests.cpp | 4 +- common/remote/hooks/s3/s3utils.cpp | 152 +++ common/remote/hooks/s3/s3utils.hpp | 86 ++ dockerfiles/s3-hook-overlay.dockerfile | 44 + helm/examples/s3/README.md | 100 ++ helm/examples/s3/values-s3-eks.yaml | 78 ++ helm/examples/s3/values-s3.yaml | 44 + helm/hpcc/templates/_helpers.tpl | 8 +- helm/hpcc/values.schema.json | 41 +- helm/hpcc/values.yaml | 10 +- system/jlib/jfile.hpp | 1 + system/jlib/jplane.cpp | 9 +- testing/regress/ecl/s3_write_paths.ecl | 28 + 19 files changed, 1359 insertions(+), 819 deletions(-) create mode 100644 common/remote/hooks/s3/README.md create mode 100644 common/remote/hooks/s3/jplane_compat.hpp create mode 100644 common/remote/hooks/s3/s3api.cpp create mode 100644 common/remote/hooks/s3/s3utils.cpp create mode 100644 common/remote/hooks/s3/s3utils.hpp create mode 100644 dockerfiles/s3-hook-overlay.dockerfile create mode 100644 helm/examples/s3/README.md create mode 100644 helm/examples/s3/values-s3-eks.yaml create mode 100644 helm/examples/s3/values-s3.yaml create mode 100644 testing/regress/ecl/s3_write_paths.ecl diff --git a/common/remote/hooks/s3/CMakeLists.txt b/common/remote/hooks/s3/CMakeLists.txt index 1c6a38565e7..d0ae052c8b1 100644 --- a/common/remote/hooks/s3/CMakeLists.txt +++ b/common/remote/hooks/s3/CMakeLists.txt @@ -14,7 +14,6 @@ # limitations under the License. ################################################################################ - # Component: s3file ##################################################### # Description: @@ -29,8 +28,11 @@ find_package(aws-cpp-sdk-core REQUIRED) find_package(aws-cpp-sdk-s3 REQUIRED) set ( SRCS + s3api.cpp s3file.cpp s3file.hpp + s3utils.cpp + s3utils.hpp ) include_directories ( @@ -65,7 +67,6 @@ if (USE_CPPUNIT) include_directories( ${HPCC_SOURCE_DIR}/testing/unittests ) - # Set runtime path to find s3file library in filehooks directory set_target_properties(s3FileTests PROPERTIES INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/filehooks" ) diff --git a/common/remote/hooks/s3/README.md b/common/remote/hooks/s3/README.md new file mode 100644 index 00000000000..f4a8693769d --- /dev/null +++ b/common/remote/hooks/s3/README.md @@ -0,0 +1,66 @@ +# S3 Direct API File Hook + +Storage hook for accessing S3 objects as HPCC files using the AWS C++ SDK. + +## File Format + +``` +s3:/ +s3:/d/ (multi-device) +``` + +## Architecture + +The hook registers via `installFileHook()` at process startup. When any HPCC +component calls `createIFile("s3:...")`, the hook intercepts it and returns an +`S3File` that implements `IFile`. Opening for read returns `S3FileReadIO`, +opening for write returns `S3FileWriteIO` — both implement `IFileIO`. + +## Features + +- Read via `GetObject` with byte-range requests +- Write coalescing — buffers small writes, auto-selects between: + - `PutObject` for files under 8MB (single HTTP request) + - Multipart upload for files 8MB+ (8MB parts) +- 0-byte objects created for empty file parts (Thor multi-part compatibility) +- Server-side copy via `CopyObject` / multipart copy for large files +- Directory listing via `ListObjectsV2` with prefix/delimiter filtering +- Connection pooling — one S3 client per plane+device combination +- Retry with exponential backoff and jitter for all S3 operations +- IAM credential chain (IRSA on EKS, env vars, ~/.aws/credentials) + +## Configuration + +Storage plane in Helm values: + +```yaml +storage: + planes: + - name: s3data + prefix: "s3:s3data" + category: data + storageapi: + type: s3 + region: us-east-1 + buckets: + - name: my-bucket + secret: my-secret # optional — omit for IAM roles +``` + +## Files + +| File | Description | +|------|-------------| +| `s3file.cpp` | S3File, S3FileReadIO, S3FileWriteIO, S3MultipartUpload, S3DirectoryIterator | +| `s3file.hpp` | Public API — `installFileHook`, `createS3File`, `isS3FileName` | +| `s3api.cpp` | S3FileHook registration, S3APICopyClient, AWS SDK lifecycle | +| `s3utils.cpp` | S3ClientManager — client cache and creation | +| `s3utils.hpp` | Retry helpers, constants, `getS3Client()` API | +| `s3fileTests.cpp` | Unit tests — URL validation | +| `jplane_compat.hpp` | Build shim for cross-version `getStoragePlaneConfig` resolution | +| `CMakeLists.txt` | Build config — links against jlib + aws-cpp-sdk-{s3,core} | + +## Build + +Built as part of the platform via CMake, or as a standalone overlay — see +`dockerfiles/s3-hook-overlay.dockerfile` and `helm/examples/s3/README.md`. diff --git a/common/remote/hooks/s3/jplane_compat.hpp b/common/remote/hooks/s3/jplane_compat.hpp new file mode 100644 index 00000000000..11ee9cc18fa --- /dev/null +++ b/common/remote/hooks/s3/jplane_compat.hpp @@ -0,0 +1,27 @@ +#ifndef JPLANE_HPP +#define JPLANE_HPP + +#include "jfile.hpp" +#include + +inline const IPropertyTree * getStoragePlaneConfig(const char * name, bool required) +{ + // Runtime may have getStoragePlaneConfig(const char*, bool) or getStoragePlane(const char*) + // Use dlsym to find whichever exists + typedef IPropertyTree * (*fn2_t)(const char *, bool); + typedef IPropertyTree * (*fn1_t)(const char *); + static fn2_t fn2 = (fn2_t)dlsym(RTLD_DEFAULT, "_Z21getStoragePlaneConfigPKcb"); + static fn1_t fn1 = (fn1_t)dlsym(RTLD_DEFAULT, "_Z15getStoragePlanePKc"); + + IPropertyTree * result = nullptr; + if (fn2) + result = fn2(name, required); + else if (fn1) + result = fn1(name); + + if (!result && required) + throw makeStringExceptionV(99, "Storage plane '%s' not found", name); + return result; +} + +#endif diff --git a/common/remote/hooks/s3/s3api.cpp b/common/remote/hooks/s3/s3api.cpp new file mode 100644 index 00000000000..9dc9b5a39b9 --- /dev/null +++ b/common/remote/hooks/s3/s3api.cpp @@ -0,0 +1,283 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include +#include +#include +#include +#include +#include +#include + +#include "platform.h" +#include "jlib.hpp" +#include "jfile.hpp" +#include "jlog.hpp" +#include "jutil.hpp" + +#include "s3file.hpp" +#include "s3utils.hpp" + +//--------------------------------------------------------------------------------------------------------------------- +// S3 API Copy Client — server-side copy via CopyObject / multipart copy + +static constexpr offset_t maxSingleCopySize = (offset_t)5 * 1024 * 1024 * 1024; // 5GB +static constexpr offset_t copyPartSize = (offset_t)500 * 1024 * 1024; // 500MB + +class S3APICopyClientOp : public CInterfaceOf +{ +public: + S3APICopyClientOp(const char * _srcBucket, const char * _srcKey, + const char * _tgtBucket, const char * _tgtKey, + const char * _srcPlane, unsigned _srcDevice, + const char * _tgtPlane, unsigned _tgtDevice) + : srcBucket(_srcBucket), srcKey(_srcKey), tgtBucket(_tgtBucket), tgtKey(_tgtKey), + srcPlane(_srcPlane), srcDevice(_srcDevice), tgtPlane(_tgtPlane), tgtDevice(_tgtDevice) {} + + virtual void startCopy(const char * source) override + { + try + { + retryS3("S3::HeadObject", srcKey.str(), [&]() + { + Aws::S3::Model::HeadObjectRequest headReq; + headReq.SetBucket(srcBucket.str()); + headReq.SetKey(srcKey.str()); + auto outcome = getSrcClient().HeadObject(headReq); + if (!outcome.IsSuccess()) + { + auto & error = outcome.GetError(); + VStringBuffer msg("S3 copy: cannot stat %s/%s: %s - %s", srcBucket.str(), srcKey.str(), + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); + } + srcSize = outcome.GetResult().GetContentLength(); + }); + if (srcSize <= maxSingleCopySize) + doSimpleCopy(); + else + doMultipartCopy(); + status = ApiCopyStatus::Success; + } + catch (...) + { + status = ApiCopyStatus::Failed; + throw; + } + } + + virtual ApiCopyStatus getProgress(CDateTime & dateTime, int64_t & outputLength) override + { + dateTime.clear(); + outputLength = (status == ApiCopyStatus::Success) ? srcSize : 0; + return status; + } + virtual ApiCopyStatus abortCopy() override { status = ApiCopyStatus::Aborted; return status; } + virtual ApiCopyStatus getStatus() const override { return status; } + +private: + void doSimpleCopy() + { + StringBuffer rawSource, copySource; + rawSource.appendf("%s/%s", srcBucket.str(), srcKey.str()); + encodeURL(copySource, rawSource.str()); + retryS3Op("S3::CopyObject", copySource.str(), [&]() + { + Aws::S3::Model::CopyObjectRequest request; + request.SetBucket(tgtBucket.str()); + request.SetKey(tgtKey.str()); + request.SetCopySource(copySource.str()); + return getTgtClient().CopyObject(request); + }); + } + + void doMultipartCopy() + { + Aws::String uploadId; + retryS3("S3::CreateMultipartUpload", tgtKey.str(), [&]() + { + Aws::S3::Model::CreateMultipartUploadRequest initReq; + initReq.SetBucket(tgtBucket.str()); + initReq.SetKey(tgtKey.str()); + auto outcome = getTgtClient().CreateMultipartUpload(initReq); + if (!outcome.IsSuccess()) + { + auto & error = outcome.GetError(); + VStringBuffer msg("Multipart copy initiate failed for %s/%s: %s - %s", + tgtBucket.str(), tgtKey.str(), error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); + } + uploadId = outcome.GetResult().GetUploadId(); + }); + + Aws::Vector parts; + StringBuffer rawSource, copySource; + rawSource.appendf("%s/%s", srcBucket.str(), srcKey.str()); + encodeURL(copySource, rawSource.str()); + + try + { + unsigned partNum = 1; + for (offset_t pos = 0; pos < srcSize; pos += copyPartSize, partNum++) + { + offset_t end = std::min(pos + copyPartSize - 1, srcSize - 1); + VStringBuffer range("bytes=%llu-%llu", (unsigned long long)pos, (unsigned long long)end); + retryS3("S3::UploadPartCopy", copySource.str(), [&]() + { + Aws::S3::Model::UploadPartCopyRequest partReq; + partReq.SetBucket(tgtBucket.str()); + partReq.SetKey(tgtKey.str()); + partReq.SetUploadId(uploadId); + partReq.SetPartNumber(partNum); + partReq.SetCopySource(copySource.str()); + partReq.SetCopySourceRange(range.str()); + auto outcome = getTgtClient().UploadPartCopy(partReq); + if (!outcome.IsSuccess()) + { + auto & error = outcome.GetError(); + VStringBuffer msg("UploadPartCopy failed for %s: %s - %s", copySource.str(), + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); + } + Aws::S3::Model::CompletedPart cp; + cp.SetPartNumber(partNum); + cp.SetETag(outcome.GetResult().GetCopyPartResult().GetETag()); + parts.push_back(cp); + }); + } + retryS3Op("S3::CompleteMultipartUpload", tgtKey.str(), [&]() + { + Aws::S3::Model::CompletedMultipartUpload completed; + completed.SetParts(parts); + Aws::S3::Model::CompleteMultipartUploadRequest completeReq; + completeReq.SetBucket(tgtBucket.str()); + completeReq.SetKey(tgtKey.str()); + completeReq.SetUploadId(uploadId); + completeReq.SetMultipartUpload(completed); + return getTgtClient().CompleteMultipartUpload(completeReq); + }); + } + catch (...) + { + Aws::S3::Model::AbortMultipartUploadRequest abortReq; + abortReq.SetBucket(tgtBucket.str()); + abortReq.SetKey(tgtKey.str()); + abortReq.SetUploadId(uploadId); + getTgtClient().AbortMultipartUpload(abortReq); + throw; + } + } + + Aws::S3::S3Client & getSrcClient() { return getS3Client(srcPlane.str(), srcDevice); } + Aws::S3::S3Client & getTgtClient() { return getS3Client(tgtPlane.str(), tgtDevice); } + + StringAttr srcBucket, srcKey, tgtBucket, tgtKey, srcPlane, tgtPlane; + unsigned srcDevice, tgtDevice; + offset_t srcSize = 0; + ApiCopyStatus status = ApiCopyStatus::NotStarted; +}; + +class S3APICopyClient : public CInterfaceOf +{ + Linked source, target; +public: + S3APICopyClient(IStorageApiInfo * _source, IStorageApiInfo * _target) + : source(_source), target(_target) {} + virtual const char * name() const override { return "S3 API copy client"; } + virtual IAPICopyClientOp * startCopy(const char * srcPath, unsigned srcStripeNum, + const char * tgtPath, unsigned tgtStripeNum) const override + { + Owned op = new S3APICopyClientOp( + source->queryStorageContainerName(srcStripeNum), srcPath, + target->queryStorageContainerName(tgtStripeNum), tgtPath, + source->queryPlaneName(), srcStripeNum, + target->queryPlaneName(), tgtStripeNum); + op->startCopy(srcPath); + return op.getClear(); + } +}; + +//--------------------------------------------------------------------------------------------------------------------- +// File hook — routes s3: filenames to S3File, provides copy client + +static bool isS3Type(const char * type) { return type && strieq(type, "s3"); } + +class S3FileHook : public CInterfaceOf +{ +public: + virtual IFile * createIFile(const char * fileName) override + { + return isS3FileName(fileName) ? createS3File(fileName) : nullptr; + } + virtual IAPICopyClient * getCopyApiClient(IStorageApiInfo * source, IStorageApiInfo * target) override + { + if (source && target && isS3Type(source->getStorageType()) && isS3Type(target->getStorageType())) + return new S3APICopyClient(source, target); + return nullptr; + } +}; + +static S3FileHook * s3FileHook = nullptr; +static CriticalSection hookCS; + +//--------------------------------------------------------------------------------------------------------------------- +// Exported functions + +extern S3FILE_API void installFileHook() +{ + CriticalBlock block(hookCS); + if (!s3FileHook) + { + s3FileHook = new S3FileHook; + addContainedFileHook(s3FileHook); + } +} + +extern S3FILE_API void removeFileHook() +{ + CriticalBlock block(hookCS); + if (s3FileHook) + { + removeContainedFileHook(s3FileHook); + delete s3FileHook; + s3FileHook = nullptr; + } +} + +//--------------------------------------------------------------------------------------------------------------------- +// AWS SDK lifecycle + +static Aws::SDKOptions awsOptions; + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + awsOptions.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Warn; +#ifndef _WIN32 + awsOptions.httpOptions.installSigPipeHandler = true; +#endif + Aws::InitAPI(awsOptions); + LOG(MCdebugProgress, "AWS SDK initialized for S3 file operations"); + return true; +} + +MODULE_EXIT() +{ + cleanupS3Clients(); + removeFileHook(); + Aws::ShutdownAPI(awsOptions); + LOG(MCdebugProgress, "AWS SDK shutdown for S3 file operations"); +} diff --git a/common/remote/hooks/s3/s3file.cpp b/common/remote/hooks/s3/s3file.cpp index 64a17d1c3a9..a4e37d7ed17 100644 --- a/common/remote/hooks/s3/s3file.cpp +++ b/common/remote/hooks/s3/s3file.cpp @@ -1,6 +1,6 @@ /*############################################################################## - HPCC SYSTEMS software Copyright (C) 2025 HPCC Systems®. + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,334 +15,65 @@ limitations under the License. ############################################################################## */ -#include -#include -#include -#include #include -#include #include #include -#include #include #include #include #include #include #include +#include #include "platform.h" #include "jlib.hpp" -#include "jio.hpp" -#include "jmutex.hpp" #include "jfile.hpp" -#include "jstring.hpp" #include "jlog.hpp" #include "jptree.hpp" -#include "jexcept.hpp" -#include "jtime.hpp" #include "jplane.hpp" -#include "jsecrets.hpp" +#include "jregexp.hpp" #include "s3file.hpp" +#include "s3utils.hpp" #ifdef _MSC_VER #undef GetObject #endif - -// Constants -constexpr const char* s3FilePrefix = "s3:"; -constexpr size_t s3FilePrefixLen = 3; // Length of "s3:" for pointer arithmetic -constexpr unsigned defaultMaxRetries = 3; - -// Global AWS initialization with reference counting -static unsigned awsInitRefCount = 0; -static CriticalSection awsCS; -static Aws::SDKOptions awsOptions; - -static void initAWS() -{ - CriticalBlock block(awsCS); - if (awsInitRefCount == 0) - { - // First initialization - awsOptions.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Warn; - Aws::InitAPI(awsOptions); - LOG(MCdebugProgress, "AWS SDK initialized for S3 file operations"); - } - - awsInitRefCount++; -} - -static void shutdownAWS() -{ - CriticalBlock block(awsCS); - if (awsInitRefCount == 1) - { - // Last reference - perform shutdown - // IMPORTANT: All S3Clients must be destroyed before this point - Aws::ShutdownAPI(awsOptions); - LOG(MCdebugProgress, "AWS SDK shutdown for S3 file operations"); - } - - assertex(awsInitRefCount > 0); - awsInitRefCount--; -} - -// S3 Client cache key based on plane and device -struct S3ClientKey -{ - std::string planeName; - unsigned device; - - bool operator==(const S3ClientKey& other) const - { - return (planeName == other.planeName) && (device == other.device); - } -}; - -// Hash specialization for S3ClientKey -namespace std { - template<> - struct hash - { - size_t operator()(const S3ClientKey& key) const noexcept - { - unsigned h = fnvInitialHash32; - if (key.planeName.c_str()) - h = hashc((const unsigned char*)key.planeName.c_str(), key.planeName.length(), h); - h = hashvalue(key.device, h); - return h; - } - }; -} - -class S3ClientManager -{ -private: - mutable CriticalSection cs; - std::unordered_map> clients; - bool initialized = false; - -public: - Aws::S3::S3Client& getClient(const char* planeName, unsigned device) - { - CriticalBlock block(cs); - if (!initialized) - { - initAWS(); - initialized = true; - } - - S3ClientKey key; - key.planeName = planeName; - key.device = device; - - if (auto it = clients.find(key); it != clients.end()) - { - return *(it->second); - } - else - { - std::unique_ptr client = createClient(planeName, device); - Aws::S3::S3Client& ref = *client; - clients.emplace(key, std::move(client)); - return ref; - } - } - - void cleanup() - { - CriticalBlock block(cs); - if (initialized) - { - // Destroy clients before shutting down AWS SDK - clients.clear(); - shutdownAWS(); - initialized = false; - } - } - - ~S3ClientManager() - { - cleanup(); - } - -private: - std::unique_ptr createClient(const char* planeName, unsigned device) - { - // Load plane configuration - Owned plane = getStoragePlaneConfig(planeName, true); - const IPropertyTree * storageapi = plane->queryPropTree("storageapi"); - if (!storageapi) - throw makeStringExceptionV(99, "No storage api defined for plane %s", planeName); - const char *type = storageapi->queryProp("@type"); - if (!type) - throw makeStringExceptionV(99, "No storage api type defined for plane %s", planeName); - if (!strieq(type, "s3")) - throw makeStringExceptionV(99, "Storage api type for plane %s is not s3", planeName); - - // Get bucket configuration by device index - VStringBuffer childPath("buckets[%u]", device); - const IPropertyTree * bucketInfo = storageapi->queryPropTree(childPath); - if (!bucketInfo) - throw makeStringExceptionV(99, "Missing bucket specification for device %u in plane %s", device, planeName); - - // Build AWS client configuration - Aws::Client::ClientConfiguration clientConfig; - - const char* regionStr = storageapi->queryProp("@region"); - if (regionStr && !isEmptyString(regionStr)) - clientConfig.region = regionStr; - - const char* endpointStr = storageapi->queryProp("@endpoint"); - if (endpointStr && !isEmptyString(endpointStr)) - clientConfig.endpointOverride = endpointStr; - - clientConfig.scheme = storageapi->getPropBool("@useSSL", true) ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; - clientConfig.connectTimeoutMs = storageapi->getPropInt("@timeoutMs", 30000); - clientConfig.requestTimeoutMs = clientConfig.connectTimeoutMs * 2; - clientConfig.retryStrategy = std::make_shared(storageapi->getPropInt("@maxRetries", defaultMaxRetries)); - - // Use secret-based credentials if provided, otherwise use default credential chain - const char* secretName = bucketInfo->queryProp("@secret"); - - if (secretName && !isEmptyString(secretName)) - { - StringBuffer accessKey, keyId; - getSecretValue(accessKey, "storage", secretName, "aws-access-key", true); - getSecretValue(keyId, "storage", secretName, "aws-key-id", true); - - auto credentials = Aws::Auth::AWSCredentials(keyId.str(), accessKey.str()); - return std::make_unique( - credentials, - clientConfig, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, - true, // useVirtualAddressing - Aws::S3::US_EAST_1_REGIONAL_ENDPOINT_OPTION::NOT_SET); - } - else - { - // Environment variables or ~/.aws/credentials - auto credentialsProvider = std::make_shared(); - return std::make_unique( - credentialsProvider, - clientConfig, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, - true, // useVirtualAddressing - Aws::S3::US_EAST_1_REGIONAL_ENDPOINT_OPTION::NOT_SET); - } - } -}; - -static S3ClientManager& getS3ClientManager() -{ - static S3ClientManager manager = S3ClientManager(); - return manager; -} - - -// Utility functions -static void logAwsError(const char* operation, const Aws::Client::AWSError& error, const char* bucket, const char* key) -{ - ERRLOG("S3 %s failed for s3://%s/%s: %s - %s", - operation, bucket, key, - error.GetExceptionName().c_str(), - error.GetMessage().c_str()); -} - -static void handleRequestBackoff(const char* message, unsigned attempt, unsigned maxRetries) -{ - OWARNLOG("%s", message); - - if (attempt >= maxRetries) - throw makeStringException(-1, message); - - // Exponential backoff with jitter - unsigned backoffMs = (1U << attempt) * 100 + (rand() % 100); - Sleep(backoffMs); -} - -static void handleRequestException(const Aws::Client::AWSError& e, const char* op, unsigned attempt, unsigned maxRetries, const char* filename, offset_t pos, offset_t len) -{ - VStringBuffer msg("%s failed (attempt %u/%u) for file %s at offset %llu, len %llu: %s - %s", - op, attempt, maxRetries, filename, pos, len, - e.GetExceptionName().c_str(), e.GetMessage().c_str()); - - handleRequestBackoff(msg, attempt, maxRetries); -} - -static void handleRequestException(const std::exception& e, const char* op, unsigned attempt, unsigned maxRetries, const char* filename, offset_t pos, offset_t len) -{ - VStringBuffer msg("%s failed (attempt %u/%u) for file %s at offset %llu, len %llu: %s", - op, attempt, maxRetries, filename, pos, len, e.what()); - - handleRequestBackoff(msg, attempt, maxRetries); -} - -static void handleRequestException(const Aws::Client::AWSError& e, const char* op, unsigned attempt, unsigned maxRetries, const char* filename) -{ - VStringBuffer msg("%s failed (attempt %u/%u) for file %s: %s - %s", - op, attempt, maxRetries, filename, - e.GetExceptionName().c_str(), e.GetMessage().c_str()); - - handleRequestBackoff(msg, attempt, maxRetries); -} - -static void handleRequestException(const std::exception& e, const char* op, unsigned attempt, unsigned maxRetries, const char* filename) -{ - VStringBuffer msg("%s failed (attempt %u/%u) for file %s: %s", - op, attempt, maxRetries, filename, e.what()); - - handleRequestBackoff(msg, attempt, maxRetries); -} - //--------------------------------------------------------------------------------------------------------------------- // Forward declarations class S3File; //--------------------------------------------------------------------------------------------------------------------- -// S3FileReadIO implementation +// S3FileReadIO + class S3FileReadIO : implements CInterfaceOf { -private: Linked file; FileIOStats stats; - CriticalSection ioCS; offset_t cachedFileSize; public: - S3FileReadIO(S3File* _file); - - // IFileIO interface - virtual size32_t read(offset_t pos, size32_t len, void* data) override; + S3FileReadIO(S3File * _file); + virtual size32_t read(offset_t pos, size32_t len, void * data) override; virtual offset_t size() override; - virtual void close() override { /* No-op for read operations */ } - virtual void flush() override { /* No-op for read operations */ } - - // Not implemented for read-only file - virtual size32_t write(offset_t pos, size32_t len, const void* data) override - { - throwUnexpected(); - } - virtual void setSize(offset_t size) override - { - throwUnexpected(); - } - - virtual unsigned __int64 getStatistic(StatisticKind kind) override; - virtual IFile* queryFile() const override; + virtual void close() override {} + virtual void flush() override {} + virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); } + virtual void setSize(offset_t size) override { throwUnexpected(); } + virtual unsigned __int64 getStatistic(StatisticKind kind) override { return stats.getStatistic(kind); } + virtual IFile * queryFile() const override; private: - size32_t readFromS3(offset_t pos, size32_t len, void* data); + size32_t readFromS3(offset_t pos, size32_t len, void * data); }; //--------------------------------------------------------------------------------------------------------------------- -// Multipart upload helper +// S3MultipartUpload + class S3MultipartUpload { -private: StringAttr planeName; unsigned device; StringAttr bucket; @@ -354,78 +85,77 @@ class S3MultipartUpload bool active = false; public: - S3MultipartUpload(const char* _planeName, unsigned _device, const char* _bucket, const char* _key) + S3MultipartUpload(const char * _planeName, unsigned _device, const char * _bucket, const char * _key) : planeName(_planeName), device(_device), bucket(_bucket), key(_key) { fullPath.appendf("s3:%s/%s", _planeName, _key); } - ~S3MultipartUpload() { if (active) - abort(); + { + try { abort(); } + catch (...) { ERRLOG("Failed to abort S3 multipart upload for %s", fullPath.str()); } + } } - bool initiate(); - bool uploadPart(const void* data, size32_t len); - bool complete(); - bool abort(); + void initiate(); + void uploadPart(const void * data, size32_t len); + void complete(); + void abort(); private: - Aws::S3::S3Client& getClient() { return getS3ClientManager().getClient(planeName.str(), device); } + Aws::S3::S3Client & getClient() { return getS3Client(planeName.str(), device); } }; //--------------------------------------------------------------------------------------------------------------------- -// S3FileWriteIO implementation +// S3FileWriteIO + class S3FileWriteIO : implements CInterfaceOf { -private: + static constexpr size32_t minMultipartSize = 5 * 1024 * 1024; // 5MB — S3 minimum part size + static constexpr size32_t writeBufferSize = 8 * 1024 * 1024; // 8MB — flush threshold + Linked file; FileIOStats stats; std::unique_ptr multipartUpload; + MemoryBuffer pending; CriticalSection ioCS; bool closed = false; offset_t currentPos = 0; public: - S3FileWriteIO(S3File* _file); + S3FileWriteIO(S3File * _file); virtual void beforeDispose() override; - - // IFileIO interface - virtual size32_t write(offset_t pos, size32_t len, const void* data) override; + virtual size32_t write(offset_t pos, size32_t len, const void * data) override; virtual void close() override; virtual void flush() override; - virtual void setSize(offset_t size) override { /* Not supported for S3 */ } + virtual void setSize(offset_t size) override {} + virtual size32_t read(offset_t pos, size32_t len, void * data) override { throwUnexpected(); } + virtual offset_t size() override { throwUnexpected(); } + virtual unsigned __int64 getStatistic(StatisticKind kind) override { return stats.getStatistic(kind); } + virtual IFile * queryFile() const override; - // Not implemented for write-only file - virtual size32_t read(offset_t pos, size32_t len, void* data) override - { - throwUnexpected(); - } - virtual offset_t size() override - { - throwUnexpected(); - } - - virtual unsigned __int64 getStatistic(StatisticKind kind) override; - virtual IFile* queryFile() const override; +private: + void flushPending(); + void putObject(const void * data, size32_t len); }; //--------------------------------------------------------------------------------------------------------------------- -// S3File implementation +// S3File + class S3File : implements CInterfaceOf { friend class S3FileReadIO; friend class S3FileWriteIO; + friend class S3DirectoryIterator; -private: StringBuffer fullName; StringBuffer planeName; StringBuffer bucketName; StringBuffer keyName; unsigned device = 1; - // Cached metadata mutable CriticalSection metaCS; mutable bool haveMeta = false; mutable bool fileExists = false; @@ -434,360 +164,202 @@ class S3File : implements CInterfaceOf mutable time_t modifiedTime = 0; public: - S3File(const char* s3FileName); + S3File(const char * s3FileName); - // IFile interface - query methods - virtual const char* queryFilename() override { return fullName.str(); } + virtual const char * queryFilename() override { return fullName.str(); } virtual bool exists() override; virtual fileBool isDirectory() override; virtual fileBool isFile() override; - virtual fileBool isReadOnly() override { return fileBool::foundYes; } // S3 files are read-only via this interface + virtual fileBool isReadOnly() override { return fileBool::foundYes; } virtual offset_t size() override; - virtual bool getTime(CDateTime* createTime, CDateTime* modifiedTime, CDateTime* accessedTime) override; - virtual bool getInfo(bool& isdir, offset_t& size, CDateTime& modtime) override; - - // IFile interface - I/O operations - virtual IFileIO* open(IFOmode mode, IFEflags extraFlags = IFEnone) override; - virtual IFileAsyncIO* openAsync(IFOmode mode) override { UNIMPLEMENTED; } - virtual IFileIO* openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags = IFEnone) override; - - // IFile interface - modification operations + virtual bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime) override; + virtual bool getInfo(bool & isdir, offset_t & size, CDateTime & modtime) override; + virtual IFileIO * open(IFOmode mode, IFEflags extraFlags = IFEnone) override; + virtual IFileAsyncIO * openAsync(IFOmode mode) override { UNIMPLEMENTED; } + virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags = IFEnone) override; virtual bool remove() override; - virtual bool createDirectory() override; - - // Not implemented operations - virtual bool setTime(const CDateTime* createTime, const CDateTime* modifiedTime, const CDateTime* accessedTime) override { UNIMPLEMENTED; } - virtual void rename(const char* newTail) override { UNIMPLEMENTED; } - virtual void move(const char* newName) override { UNIMPLEMENTED; } - virtual void setReadOnly(bool ro) override { UNIMPLEMENTED; } - virtual void setFilePermissions(unsigned fPerms) override { UNIMPLEMENTED; } - virtual bool setCompression(bool set) override { UNIMPLEMENTED; } + virtual bool createDirectory() override { return true; } + virtual IDirectoryIterator * directoryFiles(const char * mask, bool sub, bool includeDirs) override; + + // Not applicable to S3 + virtual bool setTime(const CDateTime *, const CDateTime *, const CDateTime *) override { UNIMPLEMENTED; } + virtual void rename(const char *) override { UNIMPLEMENTED; } + virtual void move(const char *) override { UNIMPLEMENTED; } + virtual void setReadOnly(bool) override { UNIMPLEMENTED; } + virtual void setFilePermissions(unsigned) override { UNIMPLEMENTED; } + virtual bool setCompression(bool) override { UNIMPLEMENTED; } virtual offset_t compressedSize() override { UNIMPLEMENTED; } virtual unsigned getCRC() override { UNIMPLEMENTED; } - virtual void setCreateFlags(unsigned short cflags) override { UNIMPLEMENTED; } - virtual void setShareMode(IFSHmode shmode) override { UNIMPLEMENTED; } - virtual IDirectoryIterator* directoryFiles(const char* mask, bool sub, bool includeDirs) override { UNIMPLEMENTED; } - virtual IDirectoryDifferenceIterator* monitorDirectory(IDirectoryIterator* prev, const char* mask, bool sub, bool includedirs, unsigned checkinterval, unsigned timeout, Semaphore* abortsem) override { UNIMPLEMENTED; } - virtual void copySection(const RemoteFilename& dest, offset_t toOfs, offset_t fromOfs, offset_t size, ICopyFileProgress* progress, CFflags copyFlags) override { UNIMPLEMENTED; } - virtual void copyTo(IFile* dest, size32_t buffersize, ICopyFileProgress* progress, bool usetmp, CFflags copyFlags) override { UNIMPLEMENTED; } - virtual IMemoryMappedFile* openMemoryMapped(offset_t ofs, memsize_t len, bool write) override { UNIMPLEMENTED; } + virtual void setCreateFlags(unsigned short) override { UNIMPLEMENTED; } + virtual void setShareMode(IFSHmode) override { UNIMPLEMENTED; } + virtual IDirectoryDifferenceIterator * monitorDirectory(IDirectoryIterator *, const char *, bool, bool, unsigned, unsigned, Semaphore *) override { UNIMPLEMENTED; } + virtual void copySection(const RemoteFilename &, offset_t, offset_t, offset_t, ICopyFileProgress *, CFflags) override { UNIMPLEMENTED; } + virtual void copyTo(IFile *, size32_t, ICopyFileProgress *, bool, CFflags) override { UNIMPLEMENTED; } + virtual IMemoryMappedFile * openMemoryMapped(offset_t, memsize_t, bool) override { UNIMPLEMENTED; } protected: void ensureMetadata() const; void gatherMetadata() const; void invalidateMeta() { CriticalBlock block(metaCS); haveMeta = false; } - Aws::S3::S3Client& getClient() const { return getS3ClientManager().getClient(planeName.str(), device); } + Aws::S3::S3Client & getClient() const { return getS3Client(planeName.str(), device); } }; //--------------------------------------------------------------------------------------------------------------------- -// Implementation of S3FileReadIO +// S3FileReadIO implementation -S3FileReadIO::S3FileReadIO(S3File* _file) - : file(_file), cachedFileSize(_file->size()) -{ -} +S3FileReadIO::S3FileReadIO(S3File * _file) + : file(_file), cachedFileSize(_file->size()) {} -size32_t S3FileReadIO::read(offset_t pos, size32_t len, void* data) +size32_t S3FileReadIO::read(offset_t pos, size32_t len, void * data) { if (pos >= cachedFileSize) return 0; - if (pos + len > cachedFileSize) len = (size32_t)(cachedFileSize - pos); - if (len == 0) return 0; - - CriticalBlock block(ioCS); - size32_t bytesRead = readFromS3(pos, len, data); stats.ioReads++; stats.ioReadBytes += bytesRead; return bytesRead; } -size32_t S3FileReadIO::readFromS3(offset_t pos, size32_t len, void* data) +size32_t S3FileReadIO::readFromS3(offset_t pos, size32_t len, void * data) { - unsigned attempt = 0; - size32_t bytesRead = 0; - const char* filename = file->queryFilename(); - + const char * filename = file->queryFilename(); CCycleTimer timer; - - for (;;) + size32_t bytesRead = retryS3("S3File::read", filename, [&]() -> size32_t { - try + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(file->bucketName.str()); + request.SetKey(file->keyName.str()); + if (pos > 0 || len != cachedFileSize) { - Aws::S3::Model::GetObjectRequest request; - request.SetBucket(file->bucketName.str()); - request.SetKey(file->keyName.str()); - - // Only set range header for partial reads - if (pos > 0 || len != cachedFileSize) - { - StringBuffer range; - range.appendf("bytes=%llu-%llu", (unsigned long long)pos, (unsigned long long)(pos + len - 1)); - request.SetRange(range.str()); - } - - auto outcome = file->getClient().GetObject(request); - - if (outcome.IsSuccess()) - { - auto& body = outcome.GetResult().GetBody(); - body.read((char*)data, len); - bytesRead = (size32_t)body.gcount(); - break; - } - else - { - attempt++; - handleRequestException(outcome.GetError(), "S3File::read", attempt, defaultMaxRetries, filename, pos, len); - } + VStringBuffer range("bytes=%llu-%llu", (unsigned long long)pos, (unsigned long long)(pos + len - 1)); + request.SetRange(range.str()); } - catch (const std::exception& e) + auto outcome = file->getClient().GetObject(request); + if (!outcome.IsSuccess()) { - attempt++; - handleRequestException(e, "S3File::read", attempt, defaultMaxRetries, filename, pos, len); + auto & error = outcome.GetError(); + VStringBuffer msg("S3File::read failed for %s: %s - %s", filename, + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); } - } - + auto & body = outcome.GetResult().GetBody(); + body.read((char *)data, len); + return (size32_t)body.gcount(); + }); stats.ioReadCycles += timer.elapsedCycles(); return bytesRead; } -offset_t S3FileReadIO::size() -{ - return file->size(); -} - -unsigned __int64 S3FileReadIO::getStatistic(StatisticKind kind) -{ - return stats.getStatistic(kind); -} - -IFile* S3FileReadIO::queryFile() const -{ - return file.get(); -} +offset_t S3FileReadIO::size() { return file->size(); } +IFile * S3FileReadIO::queryFile() const { return file.get(); } //--------------------------------------------------------------------------------------------------------------------- -// Implementation of S3MultipartUpload +// S3MultipartUpload implementation -bool S3MultipartUpload::initiate() +void S3MultipartUpload::initiate() { - unsigned attempt = 0; - - // Reserve space for estimated parts (assume average 200 parts for large files) completedParts.reserve(200); - - for (;;) + retryS3Op("S3::CreateMultipartUpload", fullPath.str(), [&]() { - try - { - Aws::S3::Model::CreateMultipartUploadRequest request; - request.SetBucket(bucket.str()); - request.SetKey(key.str()); - - auto outcome = getClient().CreateMultipartUpload(request); - if (outcome.IsSuccess()) - { - uploadId.set(outcome.GetResult().GetUploadId().c_str()); - active = true; - return true; - } - else - { - attempt++; - handleRequestException(outcome.GetError(), "S3File::initiate", attempt, defaultMaxRetries, fullPath.str()); - } - } - catch (const std::exception& e) - { - attempt++; - handleRequestException(e, "S3File::initiate", attempt, defaultMaxRetries, fullPath.str()); - } - } -} - -bool S3MultipartUpload::uploadPart(const void* data, size32_t len) -{ - if (!active) - return false; - - unsigned attempt = 0; - - for (;;) - { - try - { - Aws::S3::Model::UploadPartRequest request; - request.SetBucket(bucket.str()); - request.SetKey(key.str()); - request.SetUploadId(uploadId.str()); - request.SetPartNumber(partNumber); - - Aws::Utils::Stream::PreallocatedStreamBuf buf((unsigned char*)data, len); - request.SetBody(std::make_shared(&buf)); - - auto outcome = getClient().UploadPart(request); - if (outcome.IsSuccess()) - { - Aws::S3::Model::CompletedPart completedPart; - completedPart.SetPartNumber(partNumber); - completedPart.SetETag(outcome.GetResult().GetETag()); - completedParts.push_back(completedPart); - partNumber++; - return true; - } - else - { - attempt++; - handleRequestException(outcome.GetError(), "S3File::uploadPart", attempt, defaultMaxRetries, fullPath.str(), 0, len); - } - } - catch (const std::exception& e) + Aws::S3::Model::CreateMultipartUploadRequest request; + request.SetBucket(bucket.str()); + request.SetKey(key.str()); + auto outcome = getClient().CreateMultipartUpload(request); + if (outcome.IsSuccess()) + uploadId.set(outcome.GetResult().GetUploadId().c_str()); + return outcome; + }); + active = true; +} + +void S3MultipartUpload::uploadPart(const void * data, size32_t len) +{ + assertex(active); + retryS3Op("S3::UploadPart", fullPath.str(), [&]() + { + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket.str()); + request.SetKey(key.str()); + request.SetUploadId(uploadId.str()); + request.SetPartNumber(partNumber); + auto buf = Aws::New("s3", (unsigned char *)data, len); + request.SetBody(Aws::MakeShared("s3", buf)); + request.SetContentLength(len); + auto outcome = getClient().UploadPart(request); + if (outcome.IsSuccess()) { - attempt++; - handleRequestException(e, "S3File::uploadPart", attempt, defaultMaxRetries, fullPath.str(), 0, len); + Aws::S3::Model::CompletedPart cp; + cp.SetPartNumber(partNumber); + cp.SetETag(outcome.GetResult().GetETag()); + completedParts.push_back(cp); + partNumber++; } - } + return outcome; + }); } -bool S3MultipartUpload::complete() +void S3MultipartUpload::complete() { - if (!active) - return false; - - unsigned attempt = 0; - - for (;;) + if (!active) return; + retryS3Op("S3::CompleteMultipartUpload", fullPath.str(), [&]() { - try - { - Aws::S3::Model::CompletedMultipartUpload completedUpload; - completedUpload.SetParts(completedParts); - - Aws::S3::Model::CompleteMultipartUploadRequest request; - request.SetBucket(bucket.str()); - request.SetKey(key.str()); - request.SetUploadId(uploadId.str()); - request.SetMultipartUpload(completedUpload); - - auto outcome = getClient().CompleteMultipartUpload(request); - if (outcome.IsSuccess()) - { - active = false; - return true; - } - else - { - attempt++; - handleRequestException(outcome.GetError(), "S3File::complete", attempt, defaultMaxRetries, fullPath.str()); - } - } - catch (const std::exception& e) - { - attempt++; - handleRequestException(e, "S3File::complete", attempt, defaultMaxRetries, fullPath.str()); - } - } + Aws::S3::Model::CompletedMultipartUpload completedUpload; + completedUpload.SetParts(completedParts); + Aws::S3::Model::CompleteMultipartUploadRequest request; + request.SetBucket(bucket.str()); + request.SetKey(key.str()); + request.SetUploadId(uploadId.str()); + request.SetMultipartUpload(completedUpload); + return getClient().CompleteMultipartUpload(request); + }); + active = false; } -bool S3MultipartUpload::abort() +void S3MultipartUpload::abort() { - if (!active) - return true; - - unsigned attempt = 0; - - for (;;) + if (!active) return; + retryS3Op("S3::AbortMultipartUpload", fullPath.str(), [&]() { - try - { - Aws::S3::Model::AbortMultipartUploadRequest request; - request.SetBucket(bucket.str()); - request.SetKey(key.str()); - request.SetUploadId(uploadId.str()); - - auto outcome = getClient().AbortMultipartUpload(request); - active = false; - - if (outcome.IsSuccess()) - { - return true; - } - else - { - attempt++; - handleRequestException(outcome.GetError(), "S3File::abort", attempt, defaultMaxRetries, fullPath.str()); - } - } - catch (const std::exception& e) - { - attempt++; - handleRequestException(e, "S3File::abort", attempt, defaultMaxRetries, fullPath.str()); - } - } + Aws::S3::Model::AbortMultipartUploadRequest request; + request.SetBucket(bucket.str()); + request.SetKey(key.str()); + request.SetUploadId(uploadId.str()); + return getClient().AbortMultipartUpload(request); + }); + active = false; } //--------------------------------------------------------------------------------------------------------------------- -// Implementation of S3FileWriteIO +// S3FileWriteIO implementation -S3FileWriteIO::S3FileWriteIO(S3File* _file) - : file(_file) -{ -} +S3FileWriteIO::S3FileWriteIO(S3File * _file) : file(_file) {} void S3FileWriteIO::beforeDispose() { - try - { - close(); - } - catch (IException* e) - { - StringBuffer msg; - e->errorMessage(msg); - ERRLOG("Exception during S3 file disposal: %s", msg.str()); - e->Release(); - } - catch (...) - { - ERRLOG("Unknown exception during S3 file disposal for %s", file->queryFilename()); - } + try { close(); } + catch (IException * e) { StringBuffer msg; e->errorMessage(msg); ERRLOG("S3 file disposal: %s", msg.str()); e->Release(); } + catch (...) { ERRLOG("S3 file disposal failed for %s", file->queryFilename()); } } -size32_t S3FileWriteIO::write(offset_t pos, size32_t len, const void* data) +size32_t S3FileWriteIO::write(offset_t pos, size32_t len, const void * data) { if (closed) throw makeStringException(-1, "Attempt to write to closed S3 file"); - if (len == 0) return 0; - CriticalBlock block(ioCS); - - // For simplicity, require sequential writes if (pos != currentPos) throw makeStringException(-1, "S3 file writer only supports sequential writes"); - file->invalidateMeta(); - CCycleTimer timer; - - // Initiate multipart upload on first write - if (!multipartUpload) - { - multipartUpload = std::make_unique(file->planeName.str(), file->device, file->bucketName.str(), file->keyName.str()); - if (!multipartUpload->initiate()) - throw makeStringException(-1, "Failed to initiate multipart upload"); - } - - if (!multipartUpload->uploadPart(data, len)) - throw makeStringException(-1, "Failed to upload part to S3"); - + pending.append(len, data); currentPos += len; + while (pending.length() >= writeBufferSize) + flushPending(); stats.ioWrites++; stats.ioWriteBytes += len; stats.ioWriteCycles += timer.elapsedCycles(); @@ -797,75 +369,98 @@ size32_t S3FileWriteIO::write(offset_t pos, size32_t len, const void* data) void S3FileWriteIO::flush() { CriticalBlock block(ioCS); - // For S3, flush is essentially a no-op since we write directly - // Data is already sent to S3 in write() calls + if (pending.length() >= minMultipartSize) + flushPending(); } void S3FileWriteIO::close() { - if (closed) - return; - CriticalBlock block(ioCS); - + if (closed) return; if (multipartUpload) { - if (!multipartUpload->complete()) - throw makeStringException(-1, "Failed to complete multipart upload"); + if (pending.length()) + { + multipartUpload->uploadPart(pending.toByteArray(), pending.length()); + pending.clear(); + } + multipartUpload->complete(); multipartUpload.reset(); } - + else + { + putObject(pending.toByteArray(), pending.length()); + pending.clear(); + } closed = true; } - - -unsigned __int64 S3FileWriteIO::getStatistic(StatisticKind kind) +void S3FileWriteIO::flushPending() { - return stats.getStatistic(kind); + if (pending.length() == 0) return; + if (!multipartUpload) + { + multipartUpload = std::make_unique(file->planeName.str(), file->device, file->bucketName.str(), file->keyName.str()); + multipartUpload->initiate(); + } + size32_t flushLen = std::min(pending.length(), writeBufferSize); + multipartUpload->uploadPart(pending.toByteArray(), flushLen); + if (flushLen == pending.length()) + pending.clear(); + else + { + size32_t remaining = pending.length() - flushLen; + memmove((void *)pending.toByteArray(), pending.toByteArray() + flushLen, remaining); + pending.setLength(remaining); + } } -IFile* S3FileWriteIO::queryFile() const +void S3FileWriteIO::putObject(const void * data, size32_t len) { - return file.get(); + retryS3Op("S3::PutObject", file->queryFilename(), [&]() + { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(file->bucketName.str()); + request.SetKey(file->keyName.str()); + auto buf = Aws::New("s3", + reinterpret_cast(const_cast(data)), len); + request.SetBody(Aws::MakeShared("s3", buf)); + request.SetContentLength(len); + return file->getClient().PutObject(request); + }); } +IFile * S3FileWriteIO::queryFile() const { return file.get(); } + //--------------------------------------------------------------------------------------------------------------------- -// Implementation of S3File +// S3File implementation -S3File::S3File(const char* s3FileName) - : fullName(s3FileName) +S3File::S3File(const char * s3FileName) : fullName(s3FileName) { if (!startsWith(fullName, s3FilePrefix)) throw makeStringExceptionV(99, "Unexpected prefix on S3 filename %s", fullName.str()); - //format is s3:plane[/device]/path const char * filename = fullName.str() + strlen(s3FilePrefix); const char * slash = strchr(filename, '/'); if (!slash) throw makeStringException(99, "Missing / in s3: file reference"); - planeName.append(slash-filename, filename); + planeName.append(slash - filename, filename); Owned plane = getStoragePlaneConfig(planeName, true); - filename = slash+1; // advance past slash + filename = slash + 1; unsigned numDevices = plane->getPropInt("@numDevices", 1); if (numDevices != 1) { - //The device from the path is used to identify which device is in use - //but it is then stripped from the path if (filename[0] != 'd') throw makeStringExceptionV(99, "Expected a device number in the filename %s", fullName.str()); - char * endDevice = nullptr; - device = strtol(filename+1, &endDevice, 10); + device = strtol(filename + 1, &endDevice, 10); if ((device == 0) || (device > numDevices)) throw makeStringExceptionV(99, "Device %d out of range for plane %s", device, planeName.str()); - if (!endDevice || (*endDevice != '/')) throw makeStringExceptionV(99, "Unexpected end of device partition %s", fullName.str()); - - filename = endDevice+1; + filename = endDevice + 1; } getClient(); // validate plane and device @@ -874,52 +469,36 @@ S3File::S3File(const char* s3FileName) const char * bucket = plane->queryPropTree(childPath)->queryProp("@name"); if (isEmptyString(bucket)) throw makeStringExceptionV(99, "Missing bucket name for plane %s", planeName.str()); - bucketName.set(bucket); keyName.set(filename); } -bool S3File::exists() -{ - ensureMetadata(); - return fileExists; -} +bool S3File::exists() { ensureMetadata(); return fileExists; } fileBool S3File::isDirectory() { ensureMetadata(); - if (!fileExists) - return fileBool::notFound; - return isDir ? fileBool::foundYes : fileBool::foundNo; + return !fileExists ? fileBool::notFound : (isDir ? fileBool::foundYes : fileBool::foundNo); } fileBool S3File::isFile() { ensureMetadata(); - if (!fileExists) - return fileBool::notFound; - return !isDir ? fileBool::foundYes : fileBool::foundNo; + return !fileExists ? fileBool::notFound : (!isDir ? fileBool::foundYes : fileBool::foundNo); } -offset_t S3File::size() -{ - ensureMetadata(); - return fileSize; -} +offset_t S3File::size() { ensureMetadata(); return fileSize; } -bool S3File::getTime(CDateTime* createTime, CDateTime* modifiedTime, CDateTime* accessedTime) +bool S3File::getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime) { ensureMetadata(); - if (createTime) - createTime->set(this->modifiedTime); - if (modifiedTime) - modifiedTime->set(this->modifiedTime); - if (accessedTime) - accessedTime->clear(); + if (createTime) createTime->set(this->modifiedTime); + if (modifiedTime) modifiedTime->set(this->modifiedTime); + if (accessedTime) accessedTime->clear(); return fileExists; } -bool S3File::getInfo(bool& isdir, offset_t& size, CDateTime& modtime) +bool S3File::getInfo(bool & isdir, offset_t & size, CDateTime & modtime) { ensureMetadata(); isdir = this->isDir; @@ -928,204 +507,230 @@ bool S3File::getInfo(bool& isdir, offset_t& size, CDateTime& modtime) return fileExists; } -IFileIO* S3File::open(IFOmode mode, IFEflags extraFlags) +IFileIO * S3File::open(IFOmode mode, IFEflags extraFlags) { switch (mode) { - case IFOread: - if (!exists()) - return nullptr; - return new S3FileReadIO(this); + case IFOread: return exists() ? new S3FileReadIO(this) : nullptr; case IFOcreate: - case IFOwrite: - return new S3FileWriteIO(this); - default: - throw makeStringException(-1, "Unsupported file open mode for S3 file"); + case IFOwrite: return new S3FileWriteIO(this); + default: throw makeStringException(-1, "Unsupported file open mode for S3 file"); } } -IFileIO* S3File::openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags) +IFileIO * S3File::openShared(IFOmode mode, IFSHmode, IFEflags extraFlags) { - return open(mode, extraFlags); // S3 files are inherently shared + return open(mode, extraFlags); } bool S3File::remove() { try { - Aws::S3::Model::DeleteObjectRequest request; - request.SetBucket(bucketName.str()); - request.SetKey(keyName.str()); - - auto outcome = getClient().DeleteObject(request); - if (outcome.IsSuccess()) + retryS3Op("S3::DeleteObject", fullName.str(), [&]() { - CriticalBlock block(metaCS); - haveMeta = true; - fileExists = false; - fileSize = 0; - return true; - } - else - { - logAwsError("DeleteObject", outcome.GetError(), bucketName.str(), keyName.str()); - return false; - } + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(bucketName.str()); + request.SetKey(keyName.str()); + return getClient().DeleteObject(request); + }); + CriticalBlock block(metaCS); + haveMeta = true; + fileExists = false; + fileSize = 0; + return true; } - catch (const std::exception& e) + catch (...) { - ERRLOG("Exception in S3 file deletion: %s", e.what()); + ERRLOG("S3 DeleteObject failed for %s", fullName.str()); return false; } } -bool S3File::createDirectory() -{ - // For S3, we don't need to create directory markers explicitly - // S3 is a flat namespace where directories are just key prefixes - // Directory markers are optional and not required for file operations - // When we write the file, S3 will automatically handle the key structure - - return true; -} - void S3File::ensureMetadata() const { CriticalBlock block(metaCS); - if (haveMeta) - return; + if (haveMeta) return; gatherMetadata(); } void S3File::gatherMetadata() const { - unsigned attempt = 0; - - for (;;) + retryS3("S3File::gatherMetadata", fullName.str(), [&]() { - try + Aws::S3::Model::HeadObjectRequest request; + request.SetBucket(bucketName.str()); + request.SetKey(keyName.str()); + auto outcome = getClient().HeadObject(request); + if (outcome.IsSuccess()) { - Aws::S3::Model::HeadObjectRequest request; - request.SetBucket(bucketName.str()); - request.SetKey(keyName.str()); - - auto outcome = getClient().HeadObject(request); - if (outcome.IsSuccess()) + fileExists = true; + fileSize = outcome.GetResult().GetContentLength(); + modifiedTime = outcome.GetResult().GetLastModified().Seconds(); + isDir = false; + } + else + { + auto & error = outcome.GetError(); + if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY || + error.GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || + error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { - fileExists = true; - fileSize = outcome.GetResult().GetContentLength(); - modifiedTime = outcome.GetResult().GetLastModified().Seconds(); - isDir = false; // S3 objects are not directories in the traditional sense - break; + fileExists = false; + fileSize = 0; + modifiedTime = 0; + isDir = false; } else { - auto& error = outcome.GetError(); - if (error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY || - error.GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND) - { - // Object not found - not an error to retry - fileExists = false; - fileSize = 0; - modifiedTime = 0; - isDir = false; - break; - } - attempt++; - handleRequestException(error, "S3File::gatherMetaData", attempt, defaultMaxRetries, fullName.str()); + VStringBuffer msg("HeadObject failed for %s: %s - %s", fullName.str(), + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); } } - catch (const IException& e) - { - throw; // Re-throw IException from handleRequestException - } - catch (const std::exception& e) - { - attempt++; - handleRequestException(e, "S3File::gatherMetaData", attempt, defaultMaxRetries, fullName.str()); - } - } - + }); haveMeta = true; } //--------------------------------------------------------------------------------------------------------------------- -// File hook implementation +// S3DirectoryIterator - -extern S3FILE_API IFile *createS3File(const char *s3FileName) +class S3DirectoryIterator : implements IDirectoryIterator, public CInterface { - return new S3File(s3FileName); -} +public: + IMPLEMENT_IINTERFACE; -extern S3FILE_API bool isS3FileName(const char *fileName) -{ - if (!fileName || !startsWith(fileName, s3FilePrefix)) - return false; + S3DirectoryIterator(S3File & _owner, const char * _mask, bool _sub, bool _includeDirs) + : owner(&_owner), mask(_mask), sub(_sub), includeDirs(_includeDirs) {} - const char *planeName = fileName + s3FilePrefixLen; - const char *slash = strchr(planeName, '/'); - // Require: - // - a non-empty plane name (slash not at the start) - // - a slash separating plane name from path - // - content after the slash - return (slash != nullptr && slash != planeName && *(slash + 1) != '\0'); -} + virtual bool first() override { index = 0; entries.kill(); fetchEntries(); return isValid(); } + virtual bool next() override { index++; return isValid(); } + virtual bool isValid() override { return index < entries.ordinality(); } -class S3FileHook : public CInterfaceOf -{ -public: - virtual IFile* createIFile(const char* fileName) override + virtual IFile & query() override { - if (isS3FileName(fileName)) - return createS3File(fileName); - return nullptr; + Entry & e = entries.item(index); + if (!e.file) + { + StringBuffer path; + path.append(s3FilePrefix).append(owner->planeName).append("/").append(e.key); + e.file.setown(createS3File(path.str())); + } + return *e.file; } - virtual IAPICopyClient* getCopyApiClient(IStorageApiInfo* source, IStorageApiInfo* target) override + virtual StringBuffer & getName(StringBuffer & buf) override { - return nullptr; + const char * key = entries.item(index).key.str(); + size_t keyLen = strlen(key); + if (keyLen > 0 && key[keyLen - 1] == '/') + keyLen--; + const char * slash = nullptr; + for (size_t i = keyLen; i > 0; i--) + if (key[i - 1] == '/') { slash = key + i - 1; break; } + return slash ? buf.append(keyLen - (slash + 1 - key), slash + 1) : buf.append(keyLen, key); } -}; -static S3FileHook* s3FileHook = nullptr; -static CriticalSection hookCS; + virtual bool isDir() override { return entries.item(index).isDir; } + virtual __int64 getFileSize() override { return entries.item(index).size; } + virtual bool getModifiedTime(CDateTime & ret) override + { + ret.clear(); + time_t t = entries.item(index).modified; + if (t) ret.set(t); + return t != 0; + } -//--------------------------------------------------------------------------------------------------------------------- -// Exported functions +private: + struct Entry : public CInterface + { + StringAttr key; + Owned file; + offset_t size = 0; + time_t modified = 0; + bool isDir = false; + }; -extern S3FILE_API void installFileHook() -{ - CriticalBlock block(hookCS); - if (!s3FileHook) + void fetchEntries() { - s3FileHook = new S3FileHook; - addContainedFileHook(s3FileHook); + StringBuffer prefix(owner->keyName); + if (prefix.length() && prefix.charAt(prefix.length() - 1) != '/') + prefix.append('/'); + + VStringBuffer listCtx("s3://%s/%s", owner->bucketName.str(), prefix.str()); + Aws::String continuationToken; + bool hasMore = true; + while (hasMore) + { + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(owner->bucketName.str()); + request.SetPrefix(prefix.str()); + if (!sub) request.SetDelimiter("/"); + if (!continuationToken.empty()) request.SetContinuationToken(continuationToken); + + auto outcome = retryS3("S3::ListObjectsV2", listCtx.str(), [&]() + { + auto o = owner->getClient().ListObjectsV2(request); + if (!o.IsSuccess()) + { + auto & error = o.GetError(); + VStringBuffer msg("ListObjectsV2 failed for %s: %s - %s", listCtx.str(), + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); + } + return o; + }); + + auto & result = outcome.GetResult(); + for (auto & obj : result.GetContents()) + { + const Aws::String & key = obj.GetKey(); + if (key.length() == (size_t)prefix.length()) continue; + const char * name = key.c_str() + prefix.length(); + if (mask.length() && !WildMatch(name, mask, false)) continue; + Entry * e = new Entry; + e->key.set(key.c_str()); + e->size = obj.GetSize(); + e->modified = obj.GetLastModified().Seconds(); + entries.append(*e); + } + if (includeDirs && !sub) + { + for (auto & cp : result.GetCommonPrefixes()) + { + Entry * e = new Entry; + e->key.set(cp.GetPrefix().c_str()); + e->isDir = true; + entries.append(*e); + } + } + hasMore = result.GetIsTruncated(); + continuationToken = result.GetNextContinuationToken(); + } } -} -extern S3FILE_API void removeFileHook() + Linked owner; + StringAttr mask; + bool sub; + bool includeDirs; + unsigned index = 0; + CIArrayOf entries; +}; + +IDirectoryIterator * S3File::directoryFiles(const char * mask, bool sub, bool includeDirs) { - CriticalBlock block(hookCS); - if (s3FileHook) - { - removeContainedFileHook(s3FileHook); - delete s3FileHook; - s3FileHook = nullptr; - } + return new S3DirectoryIterator(*this, mask ? mask : "", sub, includeDirs); } -MODULE_INIT(INIT_PRIORITY_STANDARD) +//--------------------------------------------------------------------------------------------------------------------- +// Exported functions + +extern S3FILE_API IFile * createS3File(const char * s3FileName) { - return true; + return new S3File(s3FileName); } -MODULE_EXIT() +extern S3FILE_API bool isS3FileName(const char * fileName) { - if (s3FileHook) - { - removeContainedFileHook(s3FileHook); - delete s3FileHook; - s3FileHook = nullptr; - } + return !isEmptyString(fileName) && startsWith(fileName, s3FilePrefix) && strchr(fileName + s3FilePrefixLen, '/'); } diff --git a/common/remote/hooks/s3/s3file.hpp b/common/remote/hooks/s3/s3file.hpp index a891e54cfcd..a127599879f 100644 --- a/common/remote/hooks/s3/s3file.hpp +++ b/common/remote/hooks/s3/s3file.hpp @@ -1,6 +1,6 @@ /*############################################################################## - HPCC SYSTEMS software Copyright (C) 2025 HPCC Systems®. + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ #define S3FILE_HPP #include "jfile.hpp" -#include "jptree.hpp" #ifdef S3FILE_EXPORTS #define S3FILE_API DECL_EXPORT @@ -28,27 +27,17 @@ #endif /* - * Modern S3 file access implementation using storage planes + * S3 file access via storage planes * - * Provides S3 file access for filenames of the form s3:planeName/bucketName/path - * Configuration is retrieved from storage plane definitions. - * - * Features: - * - Uses latest AWS C++ SDK with modern patterns - * - Thread-safe operations with improved error handling - * - Configurable read-ahead buffering and caching - * - Support for multipart uploads for large files - * - Proper credential management via AWS credential chain - * - Comprehensive logging and metrics - * - Support for S3-compatible services (MinIO, etc.) + * Filenames: s3:/ or s3:/d/ (multi-device) + * Configuration from storage plane definitions (storageapi type "s3"). */ -// Modern S3 file interface extern "C" { extern S3FILE_API void installFileHook(); extern S3FILE_API void removeFileHook(); - extern S3FILE_API IFile *createS3File(const char* s3FileName); - extern S3FILE_API bool isS3FileName(const char* fileName); + extern S3FILE_API IFile * createS3File(const char * s3FileName); + extern S3FILE_API bool isS3FileName(const char * fileName); }; -#endif // S3FILE_HPP +#endif diff --git a/common/remote/hooks/s3/s3fileTests.cpp b/common/remote/hooks/s3/s3fileTests.cpp index d9beed69eb5..686cc032aa0 100644 --- a/common/remote/hooks/s3/s3fileTests.cpp +++ b/common/remote/hooks/s3/s3fileTests.cpp @@ -1,6 +1,6 @@ /*############################################################################## - HPCC SYSTEMS software Copyright (C) 2025 HPCC Systems®. + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ #include "s3file.hpp" /* - * Unit tests for the new S3 file implementation + * Unit tests for the S3 file implementation * * These tests focus on functionality that doesn't require actual S3 connectivity: * - URL parsing and validation diff --git a/common/remote/hooks/s3/s3utils.cpp b/common/remote/hooks/s3/s3utils.cpp new file mode 100644 index 00000000000..490ee0ab284 --- /dev/null +++ b/common/remote/hooks/s3/s3utils.cpp @@ -0,0 +1,152 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include +#include +#include +#include +#include + +#include "platform.h" +#include "jlib.hpp" +#include "jmutex.hpp" +#include "jptree.hpp" +#include "jplane.hpp" +#include "jsecrets.hpp" + +#include "s3utils.hpp" + +//--------------------------------------------------------------------------------------------------------------------- +// S3 Client cache key + +struct S3ClientKey +{ + std::string planeName; + unsigned device; + bool operator==(const S3ClientKey & other) const + { + return (planeName == other.planeName) && (device == other.device); + } +}; + +namespace std { + template<> struct hash + { + size_t operator()(const S3ClientKey & key) const noexcept + { + unsigned h = hashc((const unsigned char *)key.planeName.c_str(), key.planeName.length(), fnvInitialHash32); + return hashvalue(key.device, h); + } + }; +} + +//--------------------------------------------------------------------------------------------------------------------- +// S3 Client Manager + +class S3ClientManager +{ + mutable CriticalSection cs; + std::unordered_map> clients; + +public: + Aws::S3::S3Client & getClient(const char * planeName, unsigned device) + { + CriticalBlock block(cs); + S3ClientKey key{planeName, device}; + auto it = clients.find(key); + if (it != clients.end()) + return *(it->second); + auto client = createClient(planeName, device); + auto & ref = *client; + clients.emplace(key, std::move(client)); + return ref; + } + + void cleanup() + { + CriticalBlock block(cs); + clients.clear(); + } + +private: + std::unique_ptr createClient(const char * planeName, unsigned device) + { + Owned plane = getStoragePlaneConfig(planeName, true); + const IPropertyTree * storageapi = plane->queryPropTree("storageapi"); + if (!storageapi) + throw makeStringExceptionV(99, "No storage api defined for plane %s", planeName); + const char * type = storageapi->queryProp("@type"); + if (!type || !strieq(type, "s3")) + throw makeStringExceptionV(99, "Storage api type for plane %s is not s3", planeName); + + VStringBuffer childPath("buckets[%u]", device); + const IPropertyTree * bucketInfo = storageapi->queryPropTree(childPath); + if (!bucketInfo) + throw makeStringExceptionV(99, "Missing bucket specification for device %u in plane %s", device, planeName); + + Aws::Client::ClientConfiguration clientConfig; + const char * regionStr = storageapi->queryProp("@region"); + if (!isEmptyString(regionStr)) + clientConfig.region = regionStr; + const char * endpointStr = storageapi->queryProp("@endpoint"); + if (!isEmptyString(endpointStr)) + clientConfig.endpointOverride = endpointStr; + + clientConfig.scheme = storageapi->getPropBool("@useSSL", true) ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; + clientConfig.connectTimeoutMs = storageapi->getPropInt("@connectTimeoutMs", 30000); + clientConfig.requestTimeoutMs = storageapi->getPropInt("@requestTimeoutMs", 300000); + clientConfig.maxConnections = storageapi->getPropInt("@maxConnections", 100); + clientConfig.retryStrategy = std::make_shared(storageapi->getPropInt("@maxRetries", defaultMaxRetries)); + + const char * secretName = bucketInfo->queryProp("@secret"); + if (!isEmptyString(secretName)) + { + StringBuffer accessKey, keyId; + getSecretValue(accessKey, "storage", secretName, "aws-access-key", true); + getSecretValue(keyId, "storage", secretName, "aws-key-id", true); + return std::make_unique( + Aws::Auth::AWSCredentials(keyId.str(), accessKey.str()), + clientConfig, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, + true, Aws::S3::US_EAST_1_REGIONAL_ENDPOINT_OPTION::NOT_SET); + } + else + { + return std::make_unique( + std::make_shared(), + clientConfig, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, + true, Aws::S3::US_EAST_1_REGIONAL_ENDPOINT_OPTION::NOT_SET); + } + } +}; + +static S3ClientManager & queryS3ClientManager() +{ + static S3ClientManager manager; + return manager; +} + +Aws::S3::S3Client & getS3Client(const char * planeName, unsigned device) +{ + return queryS3ClientManager().getClient(planeName, device); +} + +void cleanupS3Clients() +{ + queryS3ClientManager().cleanup(); +} diff --git a/common/remote/hooks/s3/s3utils.hpp b/common/remote/hooks/s3/s3utils.hpp new file mode 100644 index 00000000000..c38267647b3 --- /dev/null +++ b/common/remote/hooks/s3/s3utils.hpp @@ -0,0 +1,86 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2026 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#ifndef S3UTILS_HPP +#define S3UTILS_HPP + +#include "jlib.hpp" +#include "jlog.hpp" +#include "jstring.hpp" +#include "jexcept.hpp" +#include "jutil.hpp" + +#include + +/* + * Common utility functions and constants shared by S3 file and API copy implementations + */ + +constexpr const char * s3FilePrefix = "s3:"; +constexpr size_t s3FilePrefixLen = 3; +constexpr unsigned defaultMaxRetries = 3; + +// S3 client cache — one client per (plane, device) pair +Aws::S3::S3Client & getS3Client(const char * planeName, unsigned device); +void cleanupS3Clients(); + +// Retry with exponential backoff — rethrows IException*, retries std::exception +template +static auto retryS3(const char * op, const char * context, Fn && fn) -> decltype(fn()) +{ + unsigned attempt = 0; + for (;;) + { + try + { + return fn(); + } + catch (IException *) + { + throw; + } + catch (const std::exception & e) + { + attempt++; + VStringBuffer msg("%s failed (attempt %u/%u) for %s: %s", op, attempt, defaultMaxRetries, context, e.what()); + OWARNLOG("%s", msg.str()); + if (attempt >= defaultMaxRetries) + throw makeStringException(-1, msg); + Sleep((1U << attempt) * 100 + (getRandom() % 100)); + } + } +} + +// Retry helper that converts AWS SDK error outcomes to exceptions for the retry loop +template +static void retryS3Op(const char * op, const char * context, Fn && fn) +{ + retryS3(op, context, [&]() + { + auto outcome = fn(); + if (!outcome.IsSuccess()) + { + auto & error = outcome.GetError(); + VStringBuffer msg("%s failed for %s: %s - %s", op, context, + error.GetExceptionName().c_str(), error.GetMessage().c_str()); + throw std::runtime_error(msg.str()); + } + LOG(MCdebugProgress, "%s succeeded for %s", op, context); + }); +} + +#endif diff --git a/dockerfiles/s3-hook-overlay.dockerfile b/dockerfiles/s3-hook-overlay.dockerfile new file mode 100644 index 00000000000..f332c14736a --- /dev/null +++ b/dockerfiles/s3-hook-overlay.dockerfile @@ -0,0 +1,44 @@ +FROM --platform=linux/amd64 hpccsystems/platform-build-base-ubuntu-22.04:916fbc9c AS builder + +# Copy HPCC source needed for headers +COPY system/jlib /hpcc-dev/src/system/jlib +COPY system/include /hpcc-dev/src/system/include +COPY system/security/cryptohelper /hpcc-dev/src/system/security/cryptohelper +COPY common/remote/hooks/s3/s3file.cpp /hpcc-dev/src/s3file.cpp +COPY common/remote/hooks/s3/s3file.hpp /hpcc-dev/src/s3file.hpp +COPY common/remote/hooks/s3/s3api.cpp /hpcc-dev/src/s3api.cpp +COPY common/remote/hooks/s3/s3utils.cpp /hpcc-dev/src/s3utils.cpp +COPY common/remote/hooks/s3/s3utils.hpp /hpcc-dev/src/s3utils.hpp +COPY common/remote/hooks/s3/jplane_compat.hpp /hpcc-dev/src/jplane_compat.hpp + +RUN mkdir -p /hpcc-dev/build && echo '#define BUILD_TAG "s3-dev"' > /hpcc-dev/build/build-config.h + +# Patch for stock 10.2.22 API compatibility +RUN cd /hpcc-dev/src && \ + sed -i 's|#include "jplane.hpp"|#include "jplane_compat.hpp"|' s3file.cpp s3utils.cpp && \ + sed -i 's|if (source && target && isS3Type(source->getStorageType()) && isS3Type(target->getStorageType()))|if (false)|' s3api.cpp && \ + sed -i 's|source->queryPlaneName()|""|g; s|target->queryPlaneName()|""|g' s3api.cpp + +WORKDIR /hpcc-dev + +RUN g++ -shared -fPIC -o /hpcc-dev/libs3file.so \ + /hpcc-dev/src/s3utils.cpp \ + /hpcc-dev/src/s3file.cpp \ + /hpcc-dev/src/s3api.cpp \ + -DS3FILE_EXPORTS -D_CONTAINERIZED -DINLINE_GET_CYCLES_NOW \ + -I/hpcc-dev/src/system/include \ + -I/hpcc-dev/src/system/jlib \ + -I/hpcc-dev/src/system/security/cryptohelper \ + -I/hpcc-dev/src \ + -I/hpcc-dev/build \ + -I/hpcc-dev/vcpkg_installed/x64-linux-dynamic/include \ + -L/hpcc-dev/vcpkg_installed/x64-linux-dynamic/lib \ + -Wl,-rpath,/opt/HPCCSystems/lib \ + -Wl,-soname,libs3file.so \ + -laws-cpp-sdk-s3 -laws-cpp-sdk-core -ldl \ + -std=c++17 -O2 -DNDEBUG + +FROM --platform=linux/amd64 hpccsystems/platform-core:10.2.22 +USER root +COPY --from=builder /hpcc-dev/libs3file.so /opt/HPCCSystems/filehooks/libs3file.so +USER hpcc diff --git a/helm/examples/s3/README.md b/helm/examples/s3/README.md new file mode 100644 index 00000000000..62911e54b28 --- /dev/null +++ b/helm/examples/s3/README.md @@ -0,0 +1,100 @@ +# S3 Storage Plane — Deployment Guide + +## Overview + +Deploy HPCC with an S3 storage plane on EKS, using IRSA for authentication. + +## Prerequisites + +- EKS cluster with HPCC deployed via Helm +- AWS CLI, kubectl, helm, eksctl + +## Setup + +### 1. Create S3 Bucket and IRSA + +```sh +aws s3 mb s3:// --region us-east-1 + +aws iam create-policy --policy-name hpcc-s3-access --policy-document '{ + "Version":"2012-10-17", + "Statement":[{"Effect":"Allow", + "Action":["s3:GetObject","s3:PutObject","s3:DeleteObject","s3:ListBucket", + "s3:GetBucketLocation","s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts","s3:HeadObject"], + "Resource":["arn:aws:s3:::","arn:aws:s3:::/*"]}]}' + +eksctl create iamserviceaccount \ + --name hpcc-default --namespace default \ + --cluster --region us-east-1 \ + --attach-policy-arn arn:aws:iam:::policy/hpcc-s3-access \ + --approve --override-existing-serviceaccounts +``` + +### 2. Configure Storage Plane + +Edit `values-s3-eks.yaml` with your ECR repo, bucket name, and cluster topology. The key section is the S3 storage plane: + +```yaml +storage: + planes: + - name: s3data + prefix: "s3:s3data" + category: data + storageapi: + type: s3 + region: us-east-1 + buckets: + - name: +``` + +See `values-s3.yaml` for a minimal generic example. + +### 3. Deploy + +```sh +helm upgrade myhpcc hpcc/hpcc -f values-s3-eks.yaml +``` + +### 4. Verify + +```ecl +ds := DATASET([{'hello'}], {STRING10 val}); +OUTPUT(ds,, '~test::s3_verify', OVERWRITE, PLANE('s3data')); +OUTPUT(DATASET('~test::s3_verify', {STRING10 val}, FLAT)); +``` + +Check data in S3: +```sh +aws s3 ls s3:/// --recursive --human-readable +``` + +## Overlay Build (for stock platform-core images) + +If building the S3 hook as an overlay on a stock `platform-core` image: + +```sh +docker build --platform linux/amd64 \ + -t .dkr.ecr.us-east-1.amazonaws.com/hpcc-s3-dev: \ + -f dockerfiles/s3-hook-overlay.dockerfile . + +docker push .dkr.ecr.us-east-1.amazonaws.com/hpcc-s3-dev: +``` + +Then set `global.image` in your Helm values to point to the overlay image. + +## Known Limitations + +- Helm chart schema may need patching to accept `s3` as a `storageapi.type` until natively supported +- hthor requires separate IRSA setup on its service account (`hpcc-agent`) +- Recursive directory listing (`sub=true`) not implemented +- `jplane_compat.hpp` uses GCC-mangled symbol names for cross-version builds + +## Cleanup + +```sh +aws s3 rm s3:/// --recursive +aws s3 rb s3:// +eksctl delete iamserviceaccount --name hpcc-default --namespace default \ + --cluster --region us-east-1 +``` diff --git a/helm/examples/s3/values-s3-eks.yaml b/helm/examples/s3/values-s3-eks.yaml new file mode 100644 index 00000000000..3c612679161 --- /dev/null +++ b/helm/examples/s3/values-s3-eks.yaml @@ -0,0 +1,78 @@ +# S3 storage plane example for HPCC on EKS +# Edit the below before use. +# +# Usage: +# helm upgrade myhpcc /tmp/hpcc-chart/hpcc -f values-s3-eks.yaml + +global: + image: + root: ".dkr.ecr.us-east-1.amazonaws.com" + name: "hpcc-s3-dev" + version: "v20" + visibilities: + local: + type: ClusterIP + +thor: +- name: thor + numWorkers: 3 + maxJobs: 1 + maxGraphs: 2 + workerResources: + cpu: "4" + memory: 24G + +roxie: +- name: roxie + numChannels: 2 + replicas: 2 + localAgent: false + services: + - name: roxie + servicePort: 9876 + visibility: local + numThreads: 30 + listenQueue: 200 + +storage: + planes: + - name: dali + hostPath: /mnt/fsx/dalistorage + prefix: /mnt/fsx/dalistorage + category: dali + - name: dll + hostPath: /mnt/fsx/queries + prefix: /mnt/fsx/queries + category: dll + - name: sasha + hostPath: /mnt/fsx/sashastorage + prefix: /mnt/fsx/sashastorage + category: sasha + - name: data + hostPath: /mnt/fsx/hpcc-data + prefix: /mnt/fsx/hpcc-data + category: data + - name: s3data + prefix: "s3:s3data" + category: data + blockedSequentialIO: 4194304 + blockedRandomIO: 65536 + storageapi: + type: s3 + region: us-east-1 # e.g. us-east-1 + buckets: + - name: hpcc-data- # e.g. hpcc-data-123456789012 + - name: mydropzone + hostPath: /mnt/fsx/mydropzone + prefix: /mnt/fsx/mydropzone + category: lz + - name: s3express + prefix: "s3:s3express" + category: data + blockedSequentialIO: 4194304 + blockedRandomIO: 65536 + storageapi: + type: s3 + region: us-east-1 + buckets: + - name: hpccbench--use1-az4--x-s3 diff --git a/helm/examples/s3/values-s3.yaml b/helm/examples/s3/values-s3.yaml new file mode 100644 index 00000000000..a3f57def056 --- /dev/null +++ b/helm/examples/s3/values-s3.yaml @@ -0,0 +1,44 @@ +# S3 storage plane example for HPCC on EKS +# Uses IAM roles for authentication (IRSA - IAM Roles for Service Accounts) +# +# Prerequisites: +# 1. Create an S3 bucket +# 2. Create an IAM role with S3 access policy +# 3. Associate the role with the HPCC service account via IRSA +# +# Usage: +# helm install myhpcc hpcc/hpcc -f values-s3.yaml + +storage: + planes: + - name: dali + storageClass: "" + storageSize: 1Gi + prefix: "/var/lib/HPCCSystems/dalistorage" + category: dali + - name: sasha + storageClass: "" + storageSize: 1Gi + prefix: "/var/lib/HPCCSystems/sashastorage" + category: sasha + - name: dll + storageClass: "" + storageSize: 1Gi + prefix: "/var/lib/HPCCSystems/queries" + category: dll + - name: s3data + prefix: "s3:s3data" + category: data + blockedSequentialIO: 4194304 # 4MB — batches compressed block reads for cloud I/O + blockedRandomIO: 65536 # 64KB — for index node reads + storageapi: + type: s3 + region: us-east-1 + buckets: + - name: my-hpcc-data-bucket + - name: mydropzone + prefix: "/var/lib/HPCCSystems/mydropzone" + category: lz + - name: spill + prefix: "/var/lib/HPCCSystems/hpcc-spill" + category: spill diff --git a/helm/hpcc/templates/_helpers.tpl b/helm/hpcc/templates/_helpers.tpl index f111349306e..31c19bed6de 100644 --- a/helm/hpcc/templates/_helpers.tpl +++ b/helm/hpcc/templates/_helpers.tpl @@ -235,12 +235,12 @@ storage: {{- if and (eq "data" $plane.category) (not $plane.defaultSprayParts) -}} {{- $_ := set $planeYaml "defaultSprayParts" (include "hpcc.getMaxNumWorkers" $ | int) -}} {{- end -}} - {{- /* Make sure there is enough containers provided if storageapi used*/ -}} + {{- /* Make sure there are enough containers/buckets provided if storageapi used*/ -}} {{- if $plane.storageapi -}} {{- $numDevices := int ( $plane.numDevices | default $plane.numDevices | default 1 ) }} - {{- $numContainers := len ($plane.storageapi.containers | default list) -}} - {{- if ne $numDevices $numContainers -}} - {{- $_ := fail (printf "Storage plane '%s' requires %d containers under storageapi" $plane.name $numDevices) -}} + {{- $numEntries := len ($plane.storageapi.containers | default ($plane.storageapi.buckets | default list)) -}} + {{- if ne $numDevices $numEntries -}} + {{- $_ := fail (printf "Storage plane '%s' requires %d containers/buckets under storageapi" $plane.name $numDevices) -}} {{- end -}} {{- end -}} {{- /* Remove pvc-related properties from the aliases*/ -}} diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index f162783c514..bb2d0d06649 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -810,29 +810,37 @@ } }, "storageapi": { - "description": "Configuration for cloud storage API integration (Azure Blob Storage, Azure Files, etc.)", + "description": "Configuration for cloud storage API integration (Azure Blob/Files, S3)", "type": "object", "properties": { "type": { "description": "Type of cloud storage API to use", "type": "string", - "enum": ["azurefile", "azureblob"] + "enum": ["azurefile", "azureblob", "s3"] }, "managed": { - "description": "Use managed identity for authentication", + "description": "Use managed identity for authentication (Azure)", "type": "boolean" }, "account": { - "description": "Default storage account name (used by containers if not specified at container level)", + "description": "Default storage account name (Azure)", "type": "string" }, "secret": { - "description": "Default secret name for credentials (used by containers if not specified at container level)", + "description": "Default secret name for credentials", + "type": "string" + }, + "region": { + "description": "AWS region for S3 (e.g. us-east-1)", + "type": "string" + }, + "endpoint": { + "description": "Custom S3 endpoint URL (for MinIO, etc.)", "type": "string" }, "containers": { "type": "array", - "description": "List of containers", + "description": "List of Azure containers", "items": { "description": "Storage container or file share configuration for a single device", "type": "object", @@ -852,10 +860,29 @@ }, "additionalProperties": false } + }, + "buckets": { + "type": "array", + "description": "List of S3 buckets", + "items": { + "description": "S3 bucket configuration for a single device", + "type": "object", + "properties": { + "name": { + "description": "S3 bucket name", + "type": "string" + }, + "secret": { + "description": "Secret name for AWS credentials (optional - uses IAM roles if not specified)", + "type": "string" + } + }, + "additionalProperties": false + } } }, "additionalProperties": false, - "required": [ "type", "containers" ] + "required": [ "type" ] }, "components": { "description": "Restrict storage plane mounting to only the specified components", diff --git a/helm/hpcc/values.yaml b/helm/hpcc/values.yaml index a4275d42af0..1bd80191881 100644 --- a/helm/hpcc/values.yaml +++ b/helm/hpcc/values.yaml @@ -252,14 +252,18 @@ storage: # cost: # The storage cost # storageAtRest: 0.0135 # Storage at rest cost: cost per GiB/month # storageapi: # Optional information to allow access to storage api - # type: azurefile | azureblob - # managed: true | false # (optional) use managed identity for authentication + # type: azurefile | azureblob | s3 + # managed: true | false # (optional) use managed identity for authentication (Azure) # account: # (optional) default azure storage account name # secret: # (optional) default secret name (under secrets/storage) for accessing storage account key - # containers: [ ] # a list of containers + # containers: [ ] # a list of containers (Azure) # name: # azure storage container name # account: # (optional) azure storage account name (inherits from parent if not specified) # secret: # (optional) secret name (under secrets/storage) for accessing storage account key (inherits from parent if not specified) + # region: # (optional) AWS region for S3 (e.g. us-east-1) + # endpoint: # (optional) custom S3 endpoint (for MinIO, etc.) + # buckets: [ ] # a list of S3 buckets (uses IAM roles for authentication via IRSA on EKS) + # name: # S3 bucket name - name: dali storageClass: "" diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index c81c73d1539..098567ae854 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -428,6 +428,7 @@ extern jlib_decl IFile * createIFile(const RemoteFilename & filename); interface IStorageApiInfo : implements IInterface { virtual const char * getStorageType() const = 0; + virtual const char * queryPlaneName() const = 0; virtual const char * queryStorageApiAccount(unsigned stripeNumber) const = 0; virtual const char * queryStorageContainerName(unsigned stripeNumber) const = 0; virtual StringBuffer & getSASToken(unsigned stripeNumber, StringBuffer & token) const = 0; diff --git a/system/jlib/jplane.cpp b/system/jlib/jplane.cpp index b155b6077ee..c1ee3dd2564 100644 --- a/system/jlib/jplane.cpp +++ b/system/jlib/jplane.cpp @@ -105,7 +105,7 @@ class CStoragePlaneAlias : public CInterfaceOf class CStorageApiInfo : public CInterfaceOf { public: - CStorageApiInfo(const IPropertyTree * _xml) : xml(_xml) + CStorageApiInfo(const char * _planeName, const IPropertyTree * _xml) : planeName(_planeName), xml(_xml) { if (!xml) // shouldn't happen throw makeStringException(MSGAUD_programmer, -1, "Invalid call: CStorageApiInfo(nullptr)"); @@ -114,6 +114,10 @@ class CStorageApiInfo : public CInterfaceOf { return xml->queryProp("@type"); } + virtual const char * queryPlaneName() const override + { + return planeName.str(); + } virtual const char * queryStorageApiAccount(unsigned stripeNumber) const override { const char *account = queryContainer(stripeNumber)->queryProp("@account"); @@ -150,6 +154,7 @@ class CStorageApiInfo : public CInterfaceOf return container; } Owned xml; + StringAttr planeName; }; //------------------------------------------------------------------------------------------------------------ @@ -328,7 +333,7 @@ class CStoragePlane final : public CInterfaceOf { IPropertyTree *apiInfo = config->getPropTree("storageapi"); if (apiInfo) - return new CStorageApiInfo(apiInfo); + return new CStorageApiInfo(queryName(), apiInfo); return nullptr; } diff --git a/testing/regress/ecl/s3_write_paths.ecl b/testing/regress/ecl/s3_write_paths.ecl new file mode 100644 index 00000000000..17f5a273045 --- /dev/null +++ b/testing/regress/ecl/s3_write_paths.ecl @@ -0,0 +1,28 @@ +// s3_write_paths.ecl — Integration test for S3 putObject vs multipart upload paths +// +// Verifies that small files use S3::PutObject and large files use +// S3::CreateMultipartUpload + S3::UploadPart + S3::CompleteMultipartUpload. +// +// Prerequisites: HPCC cluster with 's3data' storage plane configured. +// Run: ecl run thor s3_write_paths.ecl --server=eclwatch:8010 + +smallRec := RECORD + STRING10 val; +END; + +// Tiny file — should use S3::PutObject (single PUT) +smallDS := DATASET([{'small'}], smallRec); +OUTPUT(smallDS,, '~s3::write_path_small', OVERWRITE, PLANE('s3data')); + +bigRec := RECORD + UNSIGNED8 id; + STRING100 payload; +END; + +// ~108MB raw — exceeds 8MB multipart threshold per worker +bigDS := DATASET(1000000, TRANSFORM(bigRec, SELF.id := COUNTER, SELF.payload := (STRING100)HASH64(COUNTER))); +OUTPUT(bigDS,, '~s3::write_path_large', OVERWRITE, PLANE('s3data')); + +// Read back to verify data integrity +OUTPUT(COUNT(DATASET('~s3::write_path_small', smallRec, FLAT)), NAMED('small_count')); +OUTPUT(COUNT(DATASET('~s3::write_path_large', bigRec, FLAT)), NAMED('large_count'));