diff --git a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java index e7f09aeb..84ba1b42 100644 --- a/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java +++ b/drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java @@ -77,11 +77,11 @@ public class S3Source implements StreamSource { private static final boolean OCI_S3_COMPATIBLE = Boolean.parseBoolean(System.getProperty("gor.oci.s3.compatible", "true")); private static final boolean USE_META_CACHE = true ; - private final SourceReference sourceReference; - private final String bucket; - private final String key; - private final S3Client client; - private final S3AsyncClient asyncClient; + protected final SourceReference sourceReference; + protected final String bucket; + protected final String key; + protected final S3Client client; + protected final S3AsyncClient asyncClient; private static final Map s3fsCache = new ConcurrentHashMap<>(); private S3SourceMetadata meta; private static final Cache metadataCache = CacheBuilder.newBuilder().concurrencyLevel(4).expireAfterWrite(5, TimeUnit.MINUTES).build(); diff --git a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java index 54bb776f..13236f32 100644 --- a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java +++ b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java @@ -23,6 +23,9 @@ package org.gorpipe.s3.shared; import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; +import org.gorpipe.base.security.Credentials; +import org.gorpipe.exceptions.GorResourceException; import org.gorpipe.gor.driver.meta.DataType; import org.gorpipe.gor.driver.meta.SourceReference; import org.gorpipe.gor.table.util.PathUtils; @@ -32,7 +35,12 @@ import java.net.MalformedURLException; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; /** * Represents an object in Amazon S3 (created from S3Shared source reference). @@ -102,7 +110,8 @@ protected String s3SubPathToUriString(Path p) { return PathUtils.formatUri(updatedUri.toString()); } - private String removeExtraFolder(String path) { + // Get extra folder, empty string if there is no extra folder. + private String getExtraFolder(String path) { if (!path.toString().endsWith("/")) { String fileName = PathUtils.getFileName(path); int fileNameDotIndex = fileName.indexOf('.'); @@ -113,10 +122,18 @@ private String removeExtraFolder(String path) { if (!Strings.isNullOrEmpty(extraFolderCand) && extraFolderCand.equals(PathUtils.getFileName(parentPath)) && !Strings.isNullOrEmpty(parentParentPath)) { - return PathUtils.resolve(parentParentPath, fileName); + return extraFolderCand; } } - return path; + return ""; // No extra folder + } + + private String removeExtraFolder(String path) { + if (!Strings.isNullOrEmpty(getExtraFolder(path))) { + return PathUtils.resolve(PathUtils.getParent(PathUtils.getParent(path)), PathUtils.getFileName(path)); + } else { + return path; + } } @Override @@ -128,4 +145,41 @@ public SourceReference getTopSourceReference() { } return top; } + + // As S3SharedSource has artificial extra folders, we need to override the list method to get the correct list of files. + @Override + public Stream list() { + try { + List rawlist = new java.util.ArrayList<>(); + List list = new java.util.ArrayList<>(); + Set extraFolders = new HashSet<>(); + + for (Path p: Files.walk(getPath(), 2).toList()) { + extraFolders.add(getExtraFolder(p.toString())); + rawlist.add(removeExtraFolder(p.toString())); + } + + for (String p: rawlist) { + String subPath = p.substring(getPath().toString().length()); + int subPathIndex = subPath.indexOf('/'); + if (subPath.equals("") + || extraFolders.contains(subPath) + || (subPathIndex > 0 && subPathIndex < subPath.length() - 1 )) { + // If the path is empty or it is just the extra folder, or it is deeper than level 1. + continue; + } + list.add(p); + } + + return list.stream(); + } catch (Exception e) { + Credentials cred = getCredentials(sourceReference.getSecurityContext(), "s3", bucket); + throw new GorResourceException(String.format("List failed for %s, region: %s, access key: %s, secret key: %s", + getName(), client.serviceClientConfiguration().region(), + cred != null ? cred.getOrDefault(Credentials.Attr.KEY, "No key in creds") : "No creds", + cred != null ? (!StringUtils.isEmpty(cred.getOrDefault(Credentials.Attr.KEY, "")) ? "Has secret" : "Empty secret") + : "No creds"), + getName(), e).retry(); + } + } } diff --git a/gortools/src/main/java/gorsat/process/GorJavaUtilities.java b/gortools/src/main/java/gorsat/process/GorJavaUtilities.java index 0df4b760..deb9305e 100644 --- a/gortools/src/main/java/gorsat/process/GorJavaUtilities.java +++ b/gortools/src/main/java/gorsat/process/GorJavaUtilities.java @@ -579,9 +579,11 @@ public static String verifyLinkFileLastModified(ProjectContext projectContext, S public static void writeDictionaryFromMeta(FileReader fileReader, String outfolderpath, String dictionarypath) throws IOException { FileReader localFileReader = fileReader; - // Force meta data update on the parent (solves issue with NFS sycn) - try (Stream paths = fileReader.list(PathUtils.getParent(outfolderpath))) { - // Intentionally empty. + if (PathUtils.isLocal(outfolderpath)) { + // Force meta data update on the parent (solves issue with NFS sycn) + try (Stream paths = fileReader.list(PathUtils.getParent(outfolderpath))) { + // Intentionally empty. + } } try (Stream metapathstream = localFileReader.list(outfolderpath); Writer dictionarypathwriter = new OutputStreamWriter(localFileReader.getOutputStream(dictionarypath))) { diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java index 52532195..06e8d053 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java @@ -8,11 +8,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; import java.util.List; -import java.util.stream.Collectors; /** * Class to work with link files, read, write and access metadata. @@ -66,7 +62,7 @@ public String getPath() { } public void appendEntry(String link, String md5) { - entries.add(new LinkFileEntry(link, System.currentTimeMillis(), md5, getLatestEntry().serial() + 1)); + entries.add(new LinkFileEntryV1(link, System.currentTimeMillis(), md5, getLatestEntry().serial() + 1)); } public String getEntryUrl(long timestamp) { @@ -128,13 +124,16 @@ public long getEntriesAgeMax() { } public void save(OutputStream os) { - var header = meta.formatHeader(); - var content = new StringBuilder(header); + var content = switch (getMeta().getVersion()) { + case "1" -> new StringBuilder(meta.formatHeader()); + default -> new StringBuilder(); + }; + if (!entries.isEmpty()) { var currentTimestamp = System.currentTimeMillis(); entries.stream() .skip(Math.max(0, entries.size() - getEntriesCountMax())) - .filter(entry -> entry.timestamp() + getEntriesAgeMax() >= currentTimestamp) + .filter(entry -> entry.timestamp() <= 0 || entry.timestamp() + getEntriesAgeMax() >= currentTimestamp) .forEach(entry -> content.append(entry.format()).append("\n")); } try { @@ -146,22 +145,11 @@ public void save(OutputStream os) { private List parseEntries(String content) { return switch (getMeta().getVersion()) { - case "1" -> parseEntriesV1(content); - default -> parseEntriesV0(content); + case "1" -> LinkFileEntryV1.parse(content); + default -> List.of(LinkFileEntryV0.parse(content)); }; } - private List parseEntriesV0(String content) { - return List.of(new LinkFileEntry(content.trim(), 0, "", 0)); - } - - private List parseEntriesV1(String content) { - return Arrays.stream(content.split("\n")) - .filter(l -> !l.startsWith("#")) - .map(LinkFileEntry::parse) - .sorted(Comparator.comparingLong(LinkFileEntry::timestamp)) - .collect(Collectors.toCollection(ArrayList::new)); - } private static String loadContentFromSource(StreamSource source) { try (InputStream is = source.open()) { diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java index 4944a183..222af571 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java @@ -1,30 +1,13 @@ package org.gorpipe.gor.driver.utils; -import org.gorpipe.exceptions.GorResourceException; -import org.gorpipe.util.Strings; +interface LinkFileEntry { + String format(); -/** - * Link file entry - * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. - * @param timestamp timestamp of the link file entry. Optional. - * @param md5 md5 of file or data the link points to. Optional. - * @param serial incrementing serial number for the link file entry. Optional. - */ -record LinkFileEntry(String url, long timestamp, String md5, int serial) { - public static LinkFileEntry parse(String line) { - String[] parts = line.split("\t"); - if (Strings.isNullOrEmpty(parts[0])) { - throw new GorResourceException("Invalid link file entry: " + line, null); - } - return new LinkFileEntry( - parts[0].trim(), - parts.length > 1 && !Strings.isNullOrEmpty(parts[1]) ? Long.parseLong(parts[1]) : 0, - parts.length > 2 ? parts[2] : "", - parts.length > 3 && !Strings.isNullOrEmpty(parts[3]) ? Integer.parseInt(parts[3]) : 0 - ); - } + String url(); - public String format() { - return url + "\t" + timestamp + "\t" + md5 + "\t" + serial; - } + long timestamp(); + + String md5(); + + int serial(); } diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java new file mode 100644 index 00000000..7e3e10bf --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java @@ -0,0 +1,27 @@ +package org.gorpipe.gor.driver.utils; + +/** + * Link file entry + * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. + */ +record LinkFileEntryV0(String url) implements LinkFileEntry { + public static LinkFileEntryV0 parse(String content) { + return new LinkFileEntryV0(content.trim()); + } + + public String format() { + return url; + } + + public long timestamp() { + return 0; // No timestamp in V0 + } + + public String md5() { + return ""; // No md5 in V0 + } + + public int serial() { + return 0; // No serial in V0 + } +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java new file mode 100644 index 00000000..c85b554d --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java @@ -0,0 +1,44 @@ +package org.gorpipe.gor.driver.utils; + +import org.gorpipe.exceptions.GorResourceException; +import org.gorpipe.util.Strings; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Link file entry + * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. + * @param timestamp timestamp of the link file entry. Optional. + * @param md5 md5 of file or data the link points to. Optional. + * @param serial incrementing serial number for the link file entry. Optional. + */ +record LinkFileEntryV1(String url, long timestamp, String md5, int serial) implements LinkFileEntry { + public static List parse(String content) { + return Arrays.stream(content.split("\n")) + .filter(l -> !l.startsWith("#")) + .map(LinkFileEntryV1::parseLine) + .sorted(Comparator.comparingLong(LinkFileEntryV1::timestamp)) + .collect(Collectors.toCollection(ArrayList::new)); + } + + private static LinkFileEntryV1 parseLine(String line) { + String[] parts = line.split("\t"); + if (Strings.isNullOrEmpty(parts[0])) { + throw new GorResourceException("Invalid link file entry: " + line, null); + } + return new LinkFileEntryV1( + parts[0].trim(), + parts.length > 1 && !Strings.isNullOrEmpty(parts[1]) ? Long.parseLong(parts[1]) : 0, + parts.length > 2 ? parts[2] : "", + parts.length > 3 && !Strings.isNullOrEmpty(parts[3]) ? Integer.parseInt(parts[3]) : 0 + ); + } + + public String format() { + return url + "\t" + timestamp + "\t" + md5 + "\t" + serial; + } +} diff --git a/model/src/main/java/org/gorpipe/gor/model/FileReader.java b/model/src/main/java/org/gorpipe/gor/model/FileReader.java index cf67a1d8..7149d1b8 100644 --- a/model/src/main/java/org/gorpipe/gor/model/FileReader.java +++ b/model/src/main/java/org/gorpipe/gor/model/FileReader.java @@ -344,8 +344,11 @@ public void writeLinkIfNeeded(String url) throws IOException { if (Strings.isNullOrEmpty(url)) return; DataSource dataSource = resolveUrl(url, true); if (dataSource.forceLink()) { - LinkFile.create((StreamSource)dataSource, dataSource.getProjectLinkFileContent()) - .save(getOutputStream(dataSource.getProjectLinkFile())); + try (OutputStream os = getOutputStream(dataSource.getProjectLinkFile())) { + // Create link file with the content of the data source. + LinkFile.create((StreamSource) dataSource, dataSource.getProjectLinkFileContent()) + .save(os); + } } } diff --git a/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java b/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java index 826398be..c05b912b 100644 --- a/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java +++ b/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java @@ -286,7 +286,7 @@ public void reload() { // Note, we are lazy loading the table so loading here only means clearing the data. The actual load will happen // when we need the data. - if (prevSerial.equals(TableHeader.NO_SERIAL) || !this.header.getProperty(TableHeader.HEADER_SERIAL_KEY).equals(prevSerial)) { + if (prevSerial == null || TableHeader.NO_SERIAL.equals(prevSerial) || !this.header.getProperty(TableHeader.HEADER_SERIAL_KEY).equals(prevSerial)) { tableEntries.clear(); }