Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions drivers/src/main/java/org/gorpipe/s3/driver/S3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ public class S3Source implements StreamSource {
private static final boolean OCI_S3_COMPATIBLE = Boolean.parseBoolean(System.getProperty("gor.oci.s3.compatible", "true"));

private static final boolean USE_META_CACHE = true ;
private final SourceReference sourceReference;
private final String bucket;
private final String key;
private final S3Client client;
private final S3AsyncClient asyncClient;
protected final SourceReference sourceReference;
protected final String bucket;
protected final String key;
protected final S3Client client;
protected final S3AsyncClient asyncClient;
private static final Map<S3Client, S3FileSystem> s3fsCache = new ConcurrentHashMap<>();
private S3SourceMetadata meta;
private static final Cache<String, S3SourceMetadata> metadataCache = CacheBuilder.newBuilder().concurrencyLevel(4).expireAfterWrite(5, TimeUnit.MINUTES).build();
Expand Down
60 changes: 57 additions & 3 deletions drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
package org.gorpipe.s3.shared;

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.gorpipe.base.security.Credentials;
import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.gor.driver.meta.DataType;
import org.gorpipe.gor.driver.meta.SourceReference;
import org.gorpipe.gor.table.util.PathUtils;
Expand All @@ -32,7 +35,12 @@

import java.net.MalformedURLException;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

/**
* Represents an object in Amazon S3 (created from S3Shared source reference).
Expand Down Expand Up @@ -102,7 +110,8 @@ protected String s3SubPathToUriString(Path p) {
return PathUtils.formatUri(updatedUri.toString());
}

private String removeExtraFolder(String path) {
// Get extra folder, empty string if there is no extra folder.
private String getExtraFolder(String path) {
if (!path.toString().endsWith("/")) {
String fileName = PathUtils.getFileName(path);
int fileNameDotIndex = fileName.indexOf('.');
Expand All @@ -113,10 +122,18 @@ private String removeExtraFolder(String path) {
if (!Strings.isNullOrEmpty(extraFolderCand) &&
extraFolderCand.equals(PathUtils.getFileName(parentPath)) &&
!Strings.isNullOrEmpty(parentParentPath)) {
return PathUtils.resolve(parentParentPath, fileName);
return extraFolderCand;
}
}
return path;
return ""; // No extra folder
}

private String removeExtraFolder(String path) {
if (!Strings.isNullOrEmpty(getExtraFolder(path))) {
return PathUtils.resolve(PathUtils.getParent(PathUtils.getParent(path)), PathUtils.getFileName(path));
} else {
return path;
}
}

@Override
Expand All @@ -128,4 +145,41 @@ public SourceReference getTopSourceReference() {
}
return top;
}

// As S3SharedSource has artificial extra folders, we need to override the list method to get the correct list of files.
@Override
public Stream<String> list() {
try {
List<String> rawlist = new java.util.ArrayList<>();
List<String> list = new java.util.ArrayList<>();
Set<String> extraFolders = new HashSet<>();

for (Path p: Files.walk(getPath(), 2).toList()) {
extraFolders.add(getExtraFolder(p.toString()));
rawlist.add(removeExtraFolder(p.toString()));
}

for (String p: rawlist) {
String subPath = p.substring(getPath().toString().length());
int subPathIndex = subPath.indexOf('/');
if (subPath.equals("")
|| extraFolders.contains(subPath)
|| (subPathIndex > 0 && subPathIndex < subPath.length() - 1 )) {
// If the path is empty or it is just the extra folder, or it is deeper than level 1.
continue;
}
list.add(p);
}

return list.stream();
} catch (Exception e) {
Credentials cred = getCredentials(sourceReference.getSecurityContext(), "s3", bucket);
throw new GorResourceException(String.format("List failed for %s, region: %s, access key: %s, secret key: %s",
getName(), client.serviceClientConfiguration().region(),
cred != null ? cred.getOrDefault(Credentials.Attr.KEY, "No key in creds") : "No creds",
cred != null ? (!StringUtils.isEmpty(cred.getOrDefault(Credentials.Attr.KEY, "")) ? "Has secret" : "Empty secret")
: "No creds"),
getName(), e).retry();
}
}
}
8 changes: 5 additions & 3 deletions gortools/src/main/java/gorsat/process/GorJavaUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,11 @@ public static String verifyLinkFileLastModified(ProjectContext projectContext, S

public static void writeDictionaryFromMeta(FileReader fileReader, String outfolderpath, String dictionarypath) throws IOException {
FileReader localFileReader = fileReader;
// Force meta data update on the parent (solves issue with NFS sycn)
try (Stream<String> paths = fileReader.list(PathUtils.getParent(outfolderpath))) {
// Intentionally empty.
if (PathUtils.isLocal(outfolderpath)) {
// Force meta data update on the parent (solves issue with NFS sycn)
try (Stream<String> paths = fileReader.list(PathUtils.getParent(outfolderpath))) {
// Intentionally empty.
}
}
try (Stream<String> metapathstream = localFileReader.list(outfolderpath);
Writer dictionarypathwriter = new OutputStreamWriter(localFileReader.getOutputStream(dictionarypath))) {
Expand Down
30 changes: 9 additions & 21 deletions model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Class to work with link files, read, write and access metadata.
Expand Down Expand Up @@ -66,7 +62,7 @@ public String getPath() {
}

public void appendEntry(String link, String md5) {
entries.add(new LinkFileEntry(link, System.currentTimeMillis(), md5, getLatestEntry().serial() + 1));
entries.add(new LinkFileEntryV1(link, System.currentTimeMillis(), md5, getLatestEntry().serial() + 1));
}

public String getEntryUrl(long timestamp) {
Expand Down Expand Up @@ -128,13 +124,16 @@ public long getEntriesAgeMax() {
}

public void save(OutputStream os) {
var header = meta.formatHeader();
var content = new StringBuilder(header);
var content = switch (getMeta().getVersion()) {
case "1" -> new StringBuilder(meta.formatHeader());
default -> new StringBuilder();
};

if (!entries.isEmpty()) {
var currentTimestamp = System.currentTimeMillis();
entries.stream()
.skip(Math.max(0, entries.size() - getEntriesCountMax()))
.filter(entry -> entry.timestamp() + getEntriesAgeMax() >= currentTimestamp)
.filter(entry -> entry.timestamp() <= 0 || entry.timestamp() + getEntriesAgeMax() >= currentTimestamp)
.forEach(entry -> content.append(entry.format()).append("\n"));
}
try {
Expand All @@ -146,22 +145,11 @@ public void save(OutputStream os) {

private List<LinkFileEntry> parseEntries(String content) {
return switch (getMeta().getVersion()) {
case "1" -> parseEntriesV1(content);
default -> parseEntriesV0(content);
case "1" -> LinkFileEntryV1.parse(content);
default -> List.of(LinkFileEntryV0.parse(content));
};
}

private List<LinkFileEntry> parseEntriesV0(String content) {
return List.of(new LinkFileEntry(content.trim(), 0, "", 0));
}

private List<LinkFileEntry> parseEntriesV1(String content) {
return Arrays.stream(content.split("\n"))
.filter(l -> !l.startsWith("#"))
.map(LinkFileEntry::parse)
.sorted(Comparator.comparingLong(LinkFileEntry::timestamp))
.collect(Collectors.toCollection(ArrayList::new));
}

private static String loadContentFromSource(StreamSource source) {
try (InputStream is = source.open()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,13 @@
package org.gorpipe.gor.driver.utils;

import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.util.Strings;
interface LinkFileEntry {
String format();

/**
* Link file entry
* @param url content of the link file, path, url or a gor/nor query. Can be have more than one line.
* @param timestamp timestamp of the link file entry. Optional.
* @param md5 md5 of file or data the link points to. Optional.
* @param serial incrementing serial number for the link file entry. Optional.
*/
record LinkFileEntry(String url, long timestamp, String md5, int serial) {
public static LinkFileEntry parse(String line) {
String[] parts = line.split("\t");
if (Strings.isNullOrEmpty(parts[0])) {
throw new GorResourceException("Invalid link file entry: " + line, null);
}
return new LinkFileEntry(
parts[0].trim(),
parts.length > 1 && !Strings.isNullOrEmpty(parts[1]) ? Long.parseLong(parts[1]) : 0,
parts.length > 2 ? parts[2] : "",
parts.length > 3 && !Strings.isNullOrEmpty(parts[3]) ? Integer.parseInt(parts[3]) : 0
);
}
String url();

public String format() {
return url + "\t" + timestamp + "\t" + md5 + "\t" + serial;
}
long timestamp();

String md5();

int serial();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.gorpipe.gor.driver.utils;

/**
* Link file entry
* @param url content of the link file, path, url or a gor/nor query. Can be have more than one line.
*/
record LinkFileEntryV0(String url) implements LinkFileEntry {
public static LinkFileEntryV0 parse(String content) {
return new LinkFileEntryV0(content.trim());
}

public String format() {
return url;
}

public long timestamp() {
return 0; // No timestamp in V0
}

public String md5() {
return ""; // No md5 in V0
}

public int serial() {
return 0; // No serial in V0
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.gorpipe.gor.driver.utils;

import org.gorpipe.exceptions.GorResourceException;
import org.gorpipe.util.Strings;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Link file entry
* @param url content of the link file, path, url or a gor/nor query. Can be have more than one line.
* @param timestamp timestamp of the link file entry. Optional.
* @param md5 md5 of file or data the link points to. Optional.
* @param serial incrementing serial number for the link file entry. Optional.
*/
record LinkFileEntryV1(String url, long timestamp, String md5, int serial) implements LinkFileEntry {
public static List<LinkFileEntry> parse(String content) {
return Arrays.stream(content.split("\n"))
.filter(l -> !l.startsWith("#"))
.map(LinkFileEntryV1::parseLine)
.sorted(Comparator.comparingLong(LinkFileEntryV1::timestamp))
.collect(Collectors.toCollection(ArrayList::new));
}

private static LinkFileEntryV1 parseLine(String line) {
String[] parts = line.split("\t");
if (Strings.isNullOrEmpty(parts[0])) {
throw new GorResourceException("Invalid link file entry: " + line, null);
}
return new LinkFileEntryV1(
parts[0].trim(),
parts.length > 1 && !Strings.isNullOrEmpty(parts[1]) ? Long.parseLong(parts[1]) : 0,
parts.length > 2 ? parts[2] : "",
parts.length > 3 && !Strings.isNullOrEmpty(parts[3]) ? Integer.parseInt(parts[3]) : 0
);
}

public String format() {
return url + "\t" + timestamp + "\t" + md5 + "\t" + serial;
}
}
7 changes: 5 additions & 2 deletions model/src/main/java/org/gorpipe/gor/model/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,11 @@ public void writeLinkIfNeeded(String url) throws IOException {
if (Strings.isNullOrEmpty(url)) return;
DataSource dataSource = resolveUrl(url, true);
if (dataSource.forceLink()) {
LinkFile.create((StreamSource)dataSource, dataSource.getProjectLinkFileContent())
.save(getOutputStream(dataSource.getProjectLinkFile()));
try (OutputStream os = getOutputStream(dataSource.getProjectLinkFile())) {
// Create link file with the content of the data source.
LinkFile.create((StreamSource) dataSource, dataSource.getProjectLinkFileContent())
.save(os);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void reload() {

// Note, we are lazy loading the table so loading here only means clearing the data. The actual load will happen
// when we need the data.
if (prevSerial.equals(TableHeader.NO_SERIAL) || !this.header.getProperty(TableHeader.HEADER_SERIAL_KEY).equals(prevSerial)) {
if (prevSerial == null || TableHeader.NO_SERIAL.equals(prevSerial) || !this.header.getProperty(TableHeader.HEADER_SERIAL_KEY).equals(prevSerial)) {
tableEntries.clear();
}

Expand Down
Loading