diff --git a/.gitignore b/.gitignore index f1e6c569..105f8663 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ **/.run/ **/.vscode/ +**/.kiro/ +**/.zed/ **/.project **/.classpath diff --git a/documentation/src/all_commands.rst b/documentation/src/all_commands.rst index ad3957c6..340a6b98 100644 --- a/documentation/src/all_commands.rst +++ b/documentation/src/all_commands.rst @@ -118,4 +118,5 @@ command/LOG command/TRYSELECT command/TRYWHERE - command/TRYHIDE \ No newline at end of file + command/TRYHIDE + command/TSVAPPEND \ No newline at end of file diff --git a/documentation/src/command/TSVAPPEND.rst b/documentation/src/command/TSVAPPEND.rst new file mode 100644 index 00000000..bf5d715e --- /dev/null +++ b/documentation/src/command/TSVAPPEND.rst @@ -0,0 +1,43 @@ +.. raw:: html + + Used in: nor + +.. _TSVAPPEND: + +===== +TSVAPPEND +===== +The :ref:`TSVAPPEND` command can be used to append a stream to a file. + +Usage +===== + +.. code-block:: gor + + nor ... | tsvappend [-noheader] [-prefix ] [-link ] + +Options +======= + ++-------------------+-----------------------------------------------------------------+ +| ``-prefix `` | Takes in a text source containing prefix to be prepended to the | +| | file written. Also support string in single quotes | ++-------------------+-----------------------------------------------------------------+ +| ``-noheader`` | Don't write a header lines. Not valid with gor/gorz/nor/norz. | ++-------------------+-----------------------------------------------------------------+ +| ``-link `` | Writes a link file pointing to the the . | +| | | ++-------------------+-----------------------------------------------------------------+ +| ``-vlink `` | Writes a versioned link file pointing to the the . | +| | The should not be overwritten if it has previously | +| | been used in a link file. | ++-------------------+-----------------------------------------------------------------+ + +Examples +======== + +.. code-block:: gor + + nor fileA.nor | tsvappend fileB.nor + +The query above will apppend the contents of ``fileA.gor`` to the file ``fileB.gor``. diff --git a/documentation/src/command/WRITE.rst b/documentation/src/command/WRITE.rst index 431c603a..ca0ee087 100644 --- a/documentation/src/command/WRITE.rst +++ b/documentation/src/command/WRITE.rst @@ -22,46 +22,53 @@ Usage .. code-block:: gor - gor ... | write filename [-f forkCol] [-d] [-r] [-c] [-m] [-i type] + gor ... | write [-f forkCol] [-d] [-r] [-c] [-m] [-i type] Options ======= -+-----------------+-----------------------------------------------------------------+ -| ``-f column`` | The "fork column" used to split the output into multiple files. | -+-----------------+-----------------------------------------------------------------+ -| ``-d`` | Use subdirectories instead of #{fork} in filename for forkwrite.| -+-----------------+-----------------------------------------------------------------+ -| ``-r`` | Eliminate the fork column from the output. | -+-----------------+-----------------------------------------------------------------+ -| ``-c`` | Use column store compression for the output. | -+-----------------+-----------------------------------------------------------------+ -| ``-m`` | Create MD5 sum file along with the output file. | -+-----------------+-----------------------------------------------------------------+ -| ``-maxseg`` | Write maxseg to the gor meta file. | -+-----------------+-----------------------------------------------------------------+ -| ``-inferschema``| Write schema to the gor meta file. | -+-----------------+-----------------------------------------------------------------+ -| ``-i type`` | Write index file (.gori) with a .gorz file, (.tbi) with .vcf.gz | -| | Must state the type, which can be FULL, CHROM or TABIX | -+-----------------+-----------------------------------------------------------------+ -| ``-l level`` | Compression level (0-9). Default 1. | -+-----------------+-----------------------------------------------------------------+ -| ``-t 'tags'`` | List of tags which write ensures a file will be created. | -| | Only valid with the -f option. | -+-----------------+-----------------------------------------------------------------+ -| ``-tags 'tags'``| List of tags/alias to use in the resulting dictionary when | -| | writing the files to directories. Usually used with partgor | -| | as ``-tags #{tags}``. | -+-----------------+-----------------------------------------------------------------+ -| ``-prefix hf`` | Takes in a text source containing prefix to be prepended to the | -| | file written. Also support string in single quotes | -+-----------------+-----------------------------------------------------------------+ -| ``-noheader`` | Don't write a header lines. Not valid with gor/gorz/nor/norz. | -+-----------------+-----------------------------------------------------------------+ -| ``-card 'cols'``| Calculate cardinality of columns in 'cols' and adds to the | -| | outputs meta data. | -+-----------------+-----------------------------------------------------------------+ ++-------------------+-----------------------------------------------------------------+ +| ``-f `` | The "fork column" used to split the output into multiple files. | ++-------------------+-----------------------------------------------------------------+ +| ``-d`` | Use subdirectories instead of #{fork} in filename for forkwrite.| ++-------------------+-----------------------------------------------------------------+ +| ``-r`` | Eliminate the fork column from the output. | ++-------------------+-----------------------------------------------------------------+ +| ``-c`` | Use column store compression for the output. | ++-------------------+-----------------------------------------------------------------+ +| ``-m`` | Create MD5 sum file along with the output file. | ++-------------------+-----------------------------------------------------------------+ +| ``-maxseg`` | Write maxseg to the gor meta file. | ++-------------------+-----------------------------------------------------------------+ +| ``-inferschema`` | Write schema to the gor meta file. | ++-------------------+-----------------------------------------------------------------+ +| ``-i `` | Write index file (.gori) with a .gorz file, (.tbi) with .vcf.gz | +| | Must state the type, which can be FULL, CHROM or TABIX | ++-------------------+-----------------------------------------------------------------+ +| ``-l `` | Compression level (0-9). Default 1. | ++-------------------+-----------------------------------------------------------------+ +| ``-t ''`` | List of tags which write ensures a file will be created. | +| | Only valid with the -f option. | ++-------------------+-----------------------------------------------------------------+ +| ``-tags ''``| List of tags/alias to use in the resulting dictionary when | +| | writing the files to directories. Usually used with partgor | +| | as ``-tags #{tags}``. | ++-------------------+-----------------------------------------------------------------+ +| ``-prefix `` | Takes in a text source containing prefix to be prepended to the | +| | file written. Also support string in single quotes | ++-------------------+-----------------------------------------------------------------+ +| ``-noheader`` | Don't write a header lines. Not valid with gor/gorz/nor/norz. | ++-------------------+-----------------------------------------------------------------+ +| ``-card ''``| Calculate cardinality of columns in '' and adds to the | +| | outputs meta data. | ++-------------------+-----------------------------------------------------------------+ +| ``-link `` | Writes a link file pointing to the the . | +| | | ++-------------------+-----------------------------------------------------------------+ +| ``-vlink `` | Writes a versioned link file pointing to the the . | +| | The should not be overwritten if it has previously | +| | been used in a link file. | ++-------------------+-----------------------------------------------------------------+ Examples ======== diff --git a/documentation/src/commands.rst b/documentation/src/commands.rst index 27e72646..fc041842 100644 --- a/documentation/src/commands.rst +++ b/documentation/src/commands.rst @@ -270,8 +270,11 @@ Commands - Same as SELECT, but ignores errors generated from incorrect syntax. - GOR/NOR * - :ref:`TRYWHERE` - - Same as WHERE, but ignores errors generated from incorrect syntax. + - Same as WHERE, but ignores errors generated from incorrect syntax. - GOR/NOR + * - :ref:`TSVAPPEND` + - Similar as WRITE, but appends to the given file. + - NOR * - :ref:`UNPIVOT` - Takes information in multiple rows and splits them into multiple rows as attribute-value pairs. - GOR/NOR diff --git a/drivers/src/main/java/org/gorpipe/s3/shared/S3ProjectSharedProjectSourceProvider.java b/drivers/src/main/java/org/gorpipe/s3/shared/S3ProjectSharedProjectSourceProvider.java index 02fba1aa..25c958e8 100644 --- a/drivers/src/main/java/org/gorpipe/s3/shared/S3ProjectSharedProjectSourceProvider.java +++ b/drivers/src/main/java/org/gorpipe/s3/shared/S3ProjectSharedProjectSourceProvider.java @@ -46,7 +46,7 @@ protected String getFallbackUrl(String url) { @Override protected void updateSharedSourceLink(S3SharedSource source, String project) { - source.setProjectLinkFile(DataUtil.toFile(PathUtils.stripTrailingSlash(getRelativePath(source)), DataType.LINK)); + source.setProjectLinkFile(DataUtil.toLink(PathUtils.stripTrailingSlash(getRelativePath(source)))); source.setProjectLinkFileContent(String.format("%sprojects/%s/%s", S3ProjectSharedSourceType.PREFIX, project, getRelativePath(source))); diff --git a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java index 13236f32..7acb2d3b 100644 --- a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java +++ b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSource.java @@ -26,9 +26,9 @@ import org.apache.commons.lang3.StringUtils; import org.gorpipe.base.security.Credentials; import org.gorpipe.exceptions.GorResourceException; -import org.gorpipe.gor.driver.meta.DataType; import org.gorpipe.gor.driver.meta.SourceReference; import org.gorpipe.gor.table.util.PathUtils; +import org.gorpipe.gor.util.DataUtil; import org.gorpipe.s3.driver.*; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @@ -140,7 +140,7 @@ private String removeExtraFolder(String path) { public SourceReference getTopSourceReference() { // Shared source should be access though links, so find the first link (which should be the direct access link) SourceReference top = getSourceReference(); - while (top.getParentSourceReference() != null && !top.getParentSourceReference().getUrl().endsWith(DataType.LINK.suffix)) { + while (top.getParentSourceReference() != null && !DataUtil.isLink(top.getParentSourceReference().getUrl())) { top = top.getParentSourceReference(); } return top; diff --git a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSourceProvider.java b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSourceProvider.java index 0a4d6ef4..76562ed5 100644 --- a/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSourceProvider.java +++ b/drivers/src/main/java/org/gorpipe/s3/shared/S3SharedSourceProvider.java @@ -123,7 +123,7 @@ public S3SharedSource resolveDataSource(SourceReference sourceReference) } protected void updateSharedSourceLink(S3SharedSource source, String project) { - source.setProjectLinkFile(DataUtil.toFile(PathUtils.stripTrailingSlash(getRelativePath(source)), DataType.LINK)); + source.setProjectLinkFile(DataUtil.toLink(getRelativePath(source))); source.setProjectLinkFileContent(findSharedSourceLinkContent(source)); } @@ -216,10 +216,7 @@ private String createErrorMessageForFailure(SourceReference sourceReference, S3S private SourceReference createFallbackSourceReference(SourceReference sourceReference) { String fallbackUrl = getFallbackUrl(sourceReference.getUrl()); if (fallbackUrl != null) { - SourceReference updatedSourceReference = new SourceReference(fallbackUrl, sourceReference.securityContext, sourceReference.commonRoot, - sourceReference.getLookup(), sourceReference.getLinkSubPath(), - sourceReference.isWriteSource()); - return updatedSourceReference; + return new SourceReference(fallbackUrl, sourceReference); } return null; diff --git a/drivers/src/test/java/org/gorpipe/s3/shared/ITestS3Shared.java b/drivers/src/test/java/org/gorpipe/s3/shared/ITestS3Shared.java index 2dd104cb..44408a63 100644 --- a/drivers/src/test/java/org/gorpipe/s3/shared/ITestS3Shared.java +++ b/drivers/src/test/java/org/gorpipe/s3/shared/ITestS3Shared.java @@ -9,6 +9,7 @@ import org.gorpipe.gor.driver.PluggableGorDriver; import org.gorpipe.gor.driver.meta.DataType; import org.gorpipe.gor.driver.meta.SourceReference; +import org.gorpipe.gor.driver.meta.SourceReferenceBuilder; import org.gorpipe.gor.model.DriverBackedFileReader; import org.gorpipe.gor.model.DriverBackedSecureFileReader; import org.gorpipe.gor.model.FileReader; @@ -19,7 +20,6 @@ import org.gorpipe.utils.DriverUtils; import org.junit.*; import org.junit.contrib.java.lang.system.EnvironmentVariables; -import org.junit.contrib.java.lang.system.ProvideSystemProperty; import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.contrib.java.lang.system.SystemErrRule; import org.junit.experimental.categories.Category; @@ -402,7 +402,7 @@ public void testProjectWriteUserDataWithIndexServer() throws IOException { source.delete(); } - Assert.assertTrue(Files.exists(Path.of(gorRoot, dataPath + ".link"))); + Assert.assertTrue(Files.exists(Path.of(gorRoot, dataPath + DataType.LINK.suffix))); } @Test @@ -554,7 +554,7 @@ public void testWriteExplicitWrite() throws IOException { private DataSource getDataSourceFromProvider(S3SharedSourceProvider provider, String relativePath, Credentials.OwnerType ownerType, String owner) throws IOException { - SourceReference sourceReference = new SourceReference.Builder(provider.getSharedUrlPrefix() + relativePath) + SourceReference sourceReference = new SourceReferenceBuilder(provider.getSharedUrlPrefix() + relativePath) .commonRoot("projects/some_project") .securityContext(createSecurityContext(provider.getService(), ownerType, owner, S3_KEY, S3_SECRET)) .build(); diff --git a/drivers/src/test/java/org/gorpipe/s3/shared/UTestS3Shared.java b/drivers/src/test/java/org/gorpipe/s3/shared/UTestS3Shared.java index 4cf137fc..ad7fbfcb 100644 --- a/drivers/src/test/java/org/gorpipe/s3/shared/UTestS3Shared.java +++ b/drivers/src/test/java/org/gorpipe/s3/shared/UTestS3Shared.java @@ -5,6 +5,7 @@ import org.gorpipe.base.security.Credentials; import org.gorpipe.gor.driver.DataSource; import org.gorpipe.gor.driver.meta.SourceReference; +import org.gorpipe.gor.driver.meta.SourceReferenceBuilder; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -198,7 +199,7 @@ public void testResolveSourceGlobaluseHighestTypeInLinksFalse() throws IOExcepti private DataSource getDataSourceFromProvider(S3SharedSourceProvider provider, String relativePath, Credentials.OwnerType ownerType, String owner) throws IOException { - SourceReference sourceReference = new SourceReference.Builder(provider.getSharedUrlPrefix() + relativePath) + SourceReference sourceReference = new SourceReferenceBuilder(provider.getSharedUrlPrefix() + relativePath) .commonRoot("projects/some_project") .securityContext(createSecurityContext(provider.getService(), "some_s3_bucket", ownerType, owner)) .build(); diff --git a/gorscripts/src/test/java/org/gorpipe/gorshell/UTestHelpCmd.java b/gorscripts/src/test/java/org/gorpipe/gorshell/UTestHelpCmd.java index 35b0ebad..4717af66 100644 --- a/gorscripts/src/test/java/org/gorpipe/gorshell/UTestHelpCmd.java +++ b/gorscripts/src/test/java/org/gorpipe/gorshell/UTestHelpCmd.java @@ -39,7 +39,7 @@ public class UTestHelpCmd { "GORROWS, GORSQL, GRANNO, GREP, GROUP, GTGEN, GTLD, HIDE, INSET, JOIN, LDANNO, LEFTJOIN, LEFTWHERE, LIFTOVER, LOG, MAP, MERGE,", "MULTIMAP, NOR, NORCMD, NORIF, NORROWS, NORSQL, PARALLEL, PARTGOR, PEDPIVOT, PGOR, PILEUP, PIPESTEPS, PIVOT, PLINKREGRESSION, PREFIX,", "RANK, REGRESSION, REGSEL, RENAME, REPLACE, ROWNUM, SDL, SED, SEGHIST, SEGPROJ, SEGSPAN, SELECT, SEQ, SETCOLTYPE, SIGNATURE, SKIP,", - "SORT, SPLIT, SQL, TEE, THROWIF, TOGOR, TOP, TRYHIDE, TRYSELECT, TRYWHERE, UNPIVOT, UNTIL, VARGROUP, VARIANTS, VARJOIN, VARMERGE,", + "SORT, SPLIT, SQL, TEE, THROWIF, TOGOR, TOP, TRYHIDE, TRYSELECT, TRYWHERE, TSVAPPEND, UNPIVOT, UNTIL, VARGROUP, VARIANTS, VARJOIN, VARMERGE,", "VARNORM, VERIFYCOLTYPE, VERIFYORDER, WAIT, WHERE, WRITE", "", "->ADJUST", diff --git a/gortools/src/main/java/gorsat/process/GorJavaUtilities.java b/gortools/src/main/java/gorsat/process/GorJavaUtilities.java index 0a03a605..fab83af1 100644 --- a/gortools/src/main/java/gorsat/process/GorJavaUtilities.java +++ b/gortools/src/main/java/gorsat/process/GorJavaUtilities.java @@ -614,9 +614,9 @@ public static void writeDictionaryFromMeta(FileReader fileReader, String outfold dictionarypathwriter.write(entry); } } else writeDummyHeader(dictionarypathwriter); - - localFileReader.writeLinkIfNeeded(dictionarypath); } + + localFileReader.writeLinkIfNeeded(dictionarypath); } public static Optional parseDictionaryColumn(String[] dictList, FileReader fileReader) { diff --git a/gortools/src/main/java/org/gorpipe/gor/manager/BucketManager.java b/gortools/src/main/java/org/gorpipe/gor/manager/BucketManager.java index 838c1a6a..01087cda 100644 --- a/gortools/src/main/java/org/gorpipe/gor/manager/BucketManager.java +++ b/gortools/src/main/java/org/gorpipe/gor/manager/BucketManager.java @@ -589,7 +589,7 @@ private void deleteFileIfExists(String path) { private void deleteLinkFileIfExists(String path) { try { - String linkFile = DataUtil.isLink(path) ? path : DataUtil.toFile(path, DataType.LINK); + String linkFile = DataUtil.isLink(path) ? path : DataUtil.toLink(path); DataSource linkSource = table.getFileReader().resolveDataSource(table.getFileReader().createSourceReference(linkFile, false)); if (linkSource != null && linkSource.exists()) { linkSource.delete(); @@ -715,7 +715,7 @@ private boolean containsBucketFile(String bucketFile, Set usedBuckets) { if (usedBuckets.contains(bucketFile)) { return true; } - if (bucketFile.endsWith(".link")) { + if (DataUtil.isLink(bucketFile)) { String bucketFileNoLink = bucketFile.substring(0, bucketFile.length() - 5); if (usedBuckets.contains(bucketFileNoLink)) { return true; diff --git a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala index e3d7ebd4..940f0eaa 100644 --- a/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala +++ b/gortools/src/main/scala/gorsat/Analysis/ForkWrite.scala @@ -28,7 +28,9 @@ import gorsat.Outputs.OutFile import org.apache.commons.io.FilenameUtils import org.gorpipe.exceptions.GorResourceException import org.gorpipe.gor.binsearch.GorIndexType +import org.gorpipe.gor.driver.linkfile.LinkFile import org.gorpipe.gor.driver.meta.DataType +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource import org.gorpipe.gor.model.{DriverBackedFileReader, GorMeta, GorOptions, Row} import org.gorpipe.gor.session.{GorSession, ProjectContext} import org.gorpipe.gor.table.util.PathUtils @@ -55,6 +57,7 @@ case class OutputOptions(remove: Boolean = false, writeMeta: Boolean = true, cardCol: String = null, linkFile: String = "", + linkFileVersion: Int = 1, command: String = null, infer: Boolean = false, maxseg: Boolean = false @@ -271,17 +274,17 @@ case class ForkWrite(forkCol: Int, if (useFork) { forkMap.values.foreach(sh => { - val (linkFile, linkFileContent) = extractLink(sh.fileName) + val (linkFile, linkFileContent, linkFileVersion) = extractLink(sh.fileName) if (linkFile.nonEmpty) { - writeLinkFile(linkFile, linkFileContent) + writeLinkFile(linkFile, linkFileContent, linkFileVersion) } }) } else { - val (linkFile, linkFileContent) = extractLink(fullFileName) + val (linkFile, linkFileContent, linkFileVersion) = extractLink(fullFileName, options.linkFile, options.linkFileVersion) if (linkFile.nonEmpty) { - writeLinkFile(linkFile, linkFileContent) + writeLinkFile(linkFile, linkFileContent, linkFileVersion, getMd5) } } } @@ -295,40 +298,38 @@ case class ForkWrite(forkCol: Int, } } - private def extractLink(fileName: String) : (String,String) = { - var linkFile = options.linkFile + private def extractLink(fileName: String, optLinkFile: String = "", optLinkFileVersion: Int = 0) : (String, String, Int) = { + var linkFile = optLinkFile + var linkFileVersion = optLinkFileVersion var linkFileContent = "" if (fileName.nonEmpty) { - if (linkFile.isEmpty) { + if (linkFile.isEmpty) { val dataSource = session.getProjectContext.getFileReader.resolveUrl(fileName, true) if (dataSource != null && dataSource.forceLink()) { linkFile = dataSource.getProjectLinkFile linkFileContent = dataSource.getProjectLinkFileContent + linkFileVersion = 0 } } else { linkFileContent = PathUtils.resolve(session.getProjectContext.getProjectRoot, fileName) } } - (linkFile,linkFileContent) + (linkFile,linkFileContent,linkFileVersion) } - private def writeLinkFile(linkFile: String, linkFileContent: String) : Unit = { - val linkFileToWrite = if (DataUtil.isLink(linkFile)) - linkFile - else - DataUtil.toFile(linkFile, DataType.LINK) + private def writeLinkFile(linkFilePath: String, linkFileContent: String, + linkFileVersion: Int = 0, md5: String = null) : Unit = { + val linkFileToWrite = LinkFile.validateAndUpdateLinkFileName(linkFilePath, linkFileVersion) - // Use non driver file reader as this is exception from the write no links rule, add extra resolve with the - // server reader to validate (skip link extension as writing links is allow forbidden). + // Validate that we can write to the location (skip link extension as writing links is always forbidden). session.getProjectContext.getFileReader.resolveUrl(FilenameUtils.removeExtension(linkFileToWrite), true) + + // Use the nonsecure driver file reader as this is an exception from the write no links rule. val fileReader = new DriverBackedFileReader(session.getProjectContext.getFileReader.getSecurityContext, session.getProjectContext.getProjectRoot) - val os = fileReader.getOutputStream(linkFileToWrite) - try { - os.write(linkFileContent.getBytes()) - os.write('\n') - } finally { - if (os != null) os.close() - } + + LinkFile.load(fileReader.resolveUrl(linkFileToWrite, true).asInstanceOf[StreamSource], linkFileVersion) + .appendEntry(linkFileContent, md5, fileReader) + .save() } } diff --git a/gortools/src/main/scala/gorsat/Commands/TsvAppend.scala b/gortools/src/main/scala/gorsat/Commands/TsvAppend.scala index a87d4080..b342cd62 100644 --- a/gortools/src/main/scala/gorsat/Commands/TsvAppend.scala +++ b/gortools/src/main/scala/gorsat/Commands/TsvAppend.scala @@ -32,7 +32,7 @@ import org.gorpipe.gor.util.StringUtil class TsvAppend extends CommandInfo("TSVAPPEND", - CommandArguments("-noheader", "-prefix -link", 0), + CommandArguments("-noheader", "-prefix -link -vlink", 0), CommandOptions(gorCommand = false, norCommand = true, verifyCommand = true)) { override def processArguments(context: GorContext, argString: String, iargs: Array[String], args: Array[String], executeNor: Boolean, forcedInputHeader: String): CommandParsingResult = { @@ -72,7 +72,16 @@ class TsvAppend extends CommandInfo("TSVAPPEND", else prefixFile = Option(prfx) } - val link = stringValueOfOptionWithDefault(args,"-link","") + if (hasOption(args, "-link") && hasOption(args, "-vlink")) { + throw new GorParsingException("Options -link and -vlink are mutually exclusive") + } + val (link, linkVersion) = if (hasOption(args, "-link")) { + (stringValueOfOption(args, "-link"), 0) + } else if (hasOption(args, "-vlink")) { + (stringValueOfOption(args, "-vlink"), 1) + } else { + ("", 0) + } val fixedHeader = forcedInputHeader.split("\t").slice(0, 2).mkString("\t") @@ -87,6 +96,7 @@ class TsvAppend extends CommandInfo("TSVAPPEND", prefixFile=prefixFile, skipHeader=skipHeader, linkFile=link, + linkFileVersion=linkVersion, command=argString ) ), diff --git a/gortools/src/main/scala/gorsat/Commands/Write.scala b/gortools/src/main/scala/gorsat/Commands/Write.scala index 7cd52383..4527408e 100644 --- a/gortools/src/main/scala/gorsat/Commands/Write.scala +++ b/gortools/src/main/scala/gorsat/Commands/Write.scala @@ -34,7 +34,7 @@ import org.gorpipe.gor.util.DataUtil class Write extends CommandInfo("WRITE", - CommandArguments("-r -c -m -inferschema -maxseg -noheader", "-d -f -i -t -l -tags -card -prefix -link", 0), + CommandArguments("-r -c -m -inferschema -maxseg -noheader", "-d -f -i -t -l -tags -card -prefix -link -vlink", 0), CommandOptions(gorCommand = true, norCommand = true, verifyCommand = true)) { override def processArguments(context: GorContext, argString: String, iargs: Array[String], args: Array[String], executeNor: Boolean, forcedInputHeader: String): CommandParsingResult = { @@ -62,7 +62,17 @@ class Write extends CommandInfo("WRITE", columnCompress = hasOption(args, "-c") md5 = hasOption(args, "-m") if (hasOption(args, "-l")) compressionLevel = stringValueOfOptionWithErrorCheck(args, "-l", Array("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")).toInt - val link = stringValueOfOptionWithDefault(args,"-link","") + + if (hasOption(args, "-link") && hasOption(args, "-vlink")) { + throw new GorParsingException("Options -link and -vlink are mutually exclusive") + } + val (link, linkVersion) = if (hasOption(args, "-link")) { + (stringValueOfOption(args, "-link"), 0) + } else if (hasOption(args, "-vlink")) { + (stringValueOfOption(args, "-vlink"), 1) + } else { + ("", 0) + } if(fileName.isEmpty && useFolder.isEmpty) throw new GorResourceException("No file or folder specified",""); @@ -139,6 +149,7 @@ class Write extends CommandInfo("WRITE", skipHeader, cardCol = card, linkFile = link, + linkFileVersion = linkVersion, command = argString, infer = infer, maxseg = maxseg diff --git a/gortools/src/main/scala/gorsat/InputSources/Gor.scala b/gortools/src/main/scala/gorsat/InputSources/Gor.scala index 9f9296ea..7ffb80de 100644 --- a/gortools/src/main/scala/gorsat/InputSources/Gor.scala +++ b/gortools/src/main/scala/gorsat/InputSources/Gor.scala @@ -40,7 +40,7 @@ import scala.collection.mutable.ListBuffer object Gor { val options: List[String] = List("-nowithin", "-stdin", "-nf", "-fs", "-w", "-Y", "-g", "-q") - val valueOptions: List[String] = List("-s", "-f", "-ff", "-b", "-Z", "-dict", "-parts", "-p", "-seek", "-idx", "-ref", "-c", "-H", "-X") + val valueOptions: List[String] = List("-s", "-f", "-ff", "-b", "-Z", "-dict", "-parts", "-p", "-seek", "-idx", "-ref", "-c", "-H", "-X", "-time") } class Gor() extends InputSourceInfo("GOR", CommandArguments(Gor.options.mkString(" "), Gor.valueOptions.mkString(" "), 1)) { diff --git a/gortools/src/main/scala/gorsat/InputSources/Meta.scala b/gortools/src/main/scala/gorsat/InputSources/Meta.scala index 797d0f2c..b4a0bc13 100644 --- a/gortools/src/main/scala/gorsat/InputSources/Meta.scala +++ b/gortools/src/main/scala/gorsat/InputSources/Meta.scala @@ -38,7 +38,10 @@ class Meta() extends InputSourceInfo("META", CommandArguments("", "", 1)) { val dataSource = reader.resolveUrl(new SourceReferenceBuilder(inputData) .commonRoot(reader.getCommonRoot()) - .securityContext(reader.getSecurityContext()).isFallback(false).build()) + .securityContext(reader.getSecurityContext()) + .isFallback(false) + .queryTime(reader.getQueryTime) + .build()) val factory =GorDriverFactory.fromConfig() val it = factory.createMetaIterator(dataSource, reader) diff --git a/gortools/src/main/scala/gorsat/InputSources/Nor.scala b/gortools/src/main/scala/gorsat/InputSources/Nor.scala index f62fb788..aa97e516 100644 --- a/gortools/src/main/scala/gorsat/InputSources/Nor.scala +++ b/gortools/src/main/scala/gorsat/InputSources/Nor.scala @@ -39,7 +39,7 @@ import java.nio.file.{Files, Path} object Nor { private val norOptions: List[String] = List("-h", "-asdict", "-r", "-i", "-m", "-nl", "-fs", "-nv") - private val norValueOptions: List[String] = List("-f", "-ff", "-s", "-d", "-c") + private val norValueOptions: List[String] = List("-f", "-ff", "-s", "-d", "-c", "-time") private val norifValueOptions: List[String] = norValueOptions ::: List("-dh") private val gornorOptions: List[String] = norOptions.filter(element => element != "nv") @@ -59,6 +59,11 @@ object Nor val inputParams = iargs(0) var inputSource: GenomicIterator = null + if (hasOption(args, "-time")) { + val queryTime = CommandParseUtilities.epochValueOfOption(args, "-time") + context.getSession.getProjectContext.getFileReader.setQueryTime(queryTime) + } + try { val inputUpper = inputParams.toUpperCase if (CommandParseUtilities.isNestedCommand(inputParams)) { diff --git a/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala b/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala index 2141777b..88b6d715 100644 --- a/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala +++ b/gortools/src/main/scala/gorsat/QueryHandlers/GeneralQueryHandler.scala @@ -58,11 +58,11 @@ class GeneralQueryHandler(context: GorContext, header: Boolean) extends GorParal } else { linkCacheFileNameBase } - val linkCacheFilePath = Path.of(linkCacheFileNameBaseAdjusted + ".link") + val linkCacheFilePath = Path.of(DataUtil.toLink(linkCacheFileNameBaseAdjusted)) Files.writeString(linkCacheFilePath, PathUtils.resolve(nested.getSession.getProjectContext.getProjectRoot, writeLocationPath).toString) - val extension = FileNameUtils.getExtension(linkCacheFileNameBaseAdjusted) + ".link" + val extension = DataUtil.toLink(FileNameUtils.getExtension(linkCacheFileNameBaseAdjusted)) (linkCacheFilePath, extension) } @@ -181,16 +181,10 @@ class GeneralQueryHandler(context: GorContext, header: Boolean) extends GorParal () => block } - - override def setForce(force: Boolean): Unit = { } - override def setQueryTime(time: lang.Long): Unit = { - - } - override def getWaitTime: Long = { -1 } diff --git a/gortools/src/main/scala/gorsat/process/TestSessionFactory.scala b/gortools/src/main/scala/gorsat/process/TestSessionFactory.scala index 0dc26411..55e81790 100644 --- a/gortools/src/main/scala/gorsat/process/TestSessionFactory.scala +++ b/gortools/src/main/scala/gorsat/process/TestSessionFactory.scala @@ -38,7 +38,9 @@ import scala.jdk.javaapi.CollectionConverters * @param whitelistedCmdFiles File for white listing * @param server Indicates if the session is running on server or not */ -class TestSessionFactory(pipeOptions: PipeOptions, whitelistedCmdFiles:String, server:Boolean, securityContext:String = null, allowedWriteLocations:Array[String] = Array[String]("test", "user_data", "result_cache")) extends GorSessionFactory{ +class TestSessionFactory(pipeOptions: PipeOptions, whitelistedCmdFiles:String, server:Boolean, + securityContext:String = null, + allowedWriteLocations:Array[String] = Array[String]("test", "user_data", "result_cache")) extends GorSessionFactory { override def create(): GorSession = { val requestId = pipeOptions.requestId @@ -49,7 +51,7 @@ class TestSessionFactory(pipeOptions: PipeOptions, whitelistedCmdFiles:String, s val allowedWriteLocationList = CollectionConverters.asJava(allowedWriteLocations) - val fileReader = createFileReader(pipeOptions.gorRoot, securityContext, server, allowedWriteLocationList); + val fileReader = createFileReader(pipeOptions.gorRoot, securityContext, server, allowedWriteLocationList, System.currentTimeMillis()); val projectContextBuilder = new ProjectContext.Builder() val projectContext = projectContextBuilder .setAliasFile(pipeOptions.aliasFile) @@ -83,17 +85,18 @@ class TestSessionFactory(pipeOptions: PipeOptions, whitelistedCmdFiles:String, s session } - def createFileReader(gorRoot: String, securityContext: String = null, server: Boolean = false, writeLocations: util.List[String]): DriverBackedFileReader = { + def createFileReader(gorRoot: String, securityContext: String = null, server: Boolean = false, + writeLocations: util.List[String], startTime: Long): DriverBackedFileReader = { val emptyGorRoot = StringUtil.isEmpty(gorRoot) if (!emptyGorRoot || !StringUtil.isEmpty(securityContext)) { if(server) { new DriverBackedSecureFileReader(gorRoot, securityContext, - AccessControlContext.builder().withWriteLocations(writeLocations).build()) + AccessControlContext.builder().withWriteLocations(writeLocations).build(), startTime) } else { - new DriverBackedFileReader(securityContext, if(emptyGorRoot) "." else gorRoot) + new DriverBackedFileReader(securityContext, if(emptyGorRoot) "." else gorRoot, startTime) } } else { - new DriverBackedFileReader(securityContext, if(emptyGorRoot) "." else gorRoot) + new DriverBackedFileReader(securityContext, if(emptyGorRoot) "." else gorRoot, startTime) } } } diff --git a/gortools/src/test/java/gorsat/UTestGorWrite.java b/gortools/src/test/java/gorsat/UTestGorWrite.java index c35629e8..ba3ec0f1 100644 --- a/gortools/src/test/java/gorsat/UTestGorWrite.java +++ b/gortools/src/test/java/gorsat/UTestGorWrite.java @@ -25,7 +25,13 @@ import org.apache.commons.io.FileUtils; import org.gorpipe.exceptions.GorParsingException; import org.gorpipe.exceptions.GorSecurityException; +import org.gorpipe.exceptions.GorSystemException; +import org.gorpipe.gor.driver.linkfile.LinkFile; +import org.gorpipe.gor.driver.linkfile.LinkFileMeta; +import org.gorpipe.gor.driver.linkfile.LinkFileV1; import org.gorpipe.gor.driver.meta.DataType; +import org.gorpipe.gor.driver.providers.stream.sources.file.FileSource; +import org.gorpipe.gor.model.BaseMeta; import org.gorpipe.gor.util.DataUtil; import org.junit.*; import org.junit.rules.TemporaryFolder; @@ -51,11 +57,21 @@ public class UTestGorWrite { public TemporaryFolder tempRoot = new TemporaryFolder(); private Path tempRootPath; + private String defaultV1LinkFileHeader; + private String testdbsnpTestLine1 = """ + Chrom\tPOS\treference\tallele\tdifferentrsIDs + chr1\t10179\tC\tCC\trs367896724 + """; + @Before public void setupTest() throws IOException { workDirPath = workDir.getRoot().toPath(); Files.createDirectories(workDirPath.resolve("result_cache")); tempRootPath = tempRoot.getRoot().toPath(); + + var meta = new LinkFileMeta(); + meta.setProperty(BaseMeta.HEADER_VERSION_KEY, "1"); + defaultV1LinkFileHeader = meta.formatHeader(); } @Test @@ -67,8 +83,30 @@ public void testWritePathWithLinkFile() throws IOException { Assert.assertEquals(workDirPath.resolve("dbsnp2.gor").toString() + "\n", Files.readString(workDirPath.resolve("dbsnp3.gor.link"))); String linkresult = TestUtils.runGorPipe("gor dbsnp3.gor | top 1", "-gorroot", workDirPath.toString()); - Assert.assertEquals("Chrom\tPOS\treference\tallele\tdifferentrsIDs\n" + - "chr1\t10179\tC\tCC\trs367896724\n", linkresult); + Assert.assertEquals(testdbsnpTestLine1, linkresult); + } + + @Test + public void testWritePathWithVersionedLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -vlink dbsnp3.gor", "-gorroot", workDirPath.toString()); + + Assert.assertTrue( Files.readString(workDirPath.resolve("dbsnp3.gor.link")).startsWith( + defaultV1LinkFileHeader + workDirPath.resolve("dbsnp2.gor") + "\t")); + + String linkresult = TestUtils.runGorPipe("gor dbsnp3.gor | top 1", "-gorroot", workDirPath.toString()); + Assert.assertEquals(testdbsnpTestLine1, linkresult); + } + + @Test + public void testWritePathWithBothLinkTypes() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + var e = Assert.assertThrows(GorParsingException.class, + () -> TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -link dbsnp4.gor -vlink dbsnp3.gor", + "-gorroot", workDirPath.toString())); + Assert.assertTrue(e.getMessage().contains("Options -link and -vlink are mutually exclusive")); } @Test @@ -77,11 +115,24 @@ public void testWritePathWithServerLinkFile() throws IOException { Files.copy(p, workDirPath.resolve("dbsnp.gor")); TestUtils.runGorPipe(new String[] {"gor dbsnp.gor | write user_data/dbsnp2.gor -link user_data/dbsnp3.gor", "-gorroot", workDirPath.toString()}, true); - Assert.assertEquals(workDirPath.resolve("user_data").resolve(DataUtil.toFile("dbsnp2", DataType.GOR)).toString() + "\n", Files.readString(workDirPath.resolve("user_data").resolve(DataUtil.toLinkFile("dbsnp3", DataType.GOR)))); + Assert.assertEquals(workDirPath.resolve("user_data").resolve(DataUtil.toFile("dbsnp2", DataType.GOR)).toString() + "\n", + Files.readString(workDirPath.resolve("user_data").resolve(DataUtil.toLinkFile("dbsnp3", DataType.GOR)))); + + String linkresult = TestUtils.runGorPipe("gor user_data/dbsnp3.gor | top 1", "-gorroot", workDirPath.toString()); + Assert.assertEquals(testdbsnpTestLine1, linkresult); + } + + @Test + public void testWritePathWithServerVersionedLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + TestUtils.runGorPipe(new String[] {"gor dbsnp.gor | write user_data/dbsnp2.gor -vlink user_data/dbsnp3.gor", "-gorroot", workDirPath.toString()}, true); + + Assert.assertTrue( Files.readString(workDirPath.resolve("user_data").resolve(DataUtil.toLinkFile("dbsnp3", DataType.GOR))).startsWith( + defaultV1LinkFileHeader + workDirPath.resolve("user_data").resolve(DataUtil.toFile("dbsnp2", DataType.GOR)).toString() + "\t")); String linkresult = TestUtils.runGorPipe("gor user_data/dbsnp3.gor | top 1", "-gorroot", workDirPath.toString()); - Assert.assertEquals("Chrom\tPOS\treference\tallele\tdifferentrsIDs\n" + - "chr1\t10179\tC\tCC\trs367896724\n", linkresult); + Assert.assertEquals(testdbsnpTestLine1, linkresult); } @Test @@ -89,7 +140,10 @@ public void testWritePathWithUnAuthorizedServerLinkFile() throws IOException { Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); Files.copy(p, workDirPath.resolve("dbsnp.gor")); - Assert.assertThrows( "Writing link to un-writable project location, throws exception",GorSecurityException.class, () -> TestUtils.runGorPipe(new String[]{"gor dbsnp.gor | write user_data/dbsnp2.gor -link /tmp/dbsnp3.gor", "-gorroot", workDirPath.toString()}, true)); + Assert.assertThrows( "Writing link to un-writable project location, throws exception", + GorSecurityException.class, + () -> TestUtils.runGorPipe(new String[]{"gor dbsnp.gor | write user_data/dbsnp2.gor -link /tmp/dbsnp3.gor", + "-gorroot", workDirPath.toString()}, true)); } @@ -110,6 +164,19 @@ public void testWritePathWithExistingLinkFile() throws IOException { Assert.assertEquals(workDirPath.resolve("dbsnp2.gor").toString() + "\n", Files.readString(workDirPath.resolve("dbsnp3.gor.link"))); } + @Test + public void testWritePathWithExistingVersionedLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + Files.writeString(workDirPath.resolve("dbsnp3.gor.link"), workDirPath.resolve("dbsnp.gor").toString() + "\n"); + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -vlink dbsnp3.gor", "-gorroot", workDirPath.toString()); + + Assert.assertTrue(Files.readString(workDirPath.resolve("dbsnp3.gor.link")).startsWith( + defaultV1LinkFileHeader + + workDirPath.resolve("dbsnp.gor") + "\t0\t\t0\n" + + workDirPath.resolve("dbsnp2.gor") + "\t")); + } + @Test public void testWritePathWithExistingBadLinkFile() throws IOException { Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); @@ -120,6 +187,51 @@ public void testWritePathWithExistingBadLinkFile() throws IOException { Assert.assertEquals(workDirPath.resolve("dbsnp2.gor").toString() + "\n", Files.readString(workDirPath.resolve("dbsnp3.gor.link"))); } + @Test + public void testWritePathWithExistingBadVersionedLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + Files.writeString(workDirPath.resolve("dbsnp3.gor.link"), ""); + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -vlink dbsnp3.gor", "-gorroot", workDirPath.toString()); + + Assert.assertTrue(Files.readString(workDirPath.resolve("dbsnp3.gor.link")).startsWith( + defaultV1LinkFileHeader + workDirPath.resolve("dbsnp2.gor") + "\t")); + } + + @Test + public void testOverwritePathWithExistingLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -link dbsnp3.gor", "-gorroot", workDirPath.toString()); + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -link dbsnp3.gor", "-gorroot", workDirPath.toString()); + + Assert.assertEquals(workDirPath.resolve("dbsnp2.gor").toString() + "\n", + Files.readString(workDirPath.resolve("dbsnp3.gor.link"))); + + } + + @Test + public void testOverwritePathWithExistingVersionedLinkFile() throws IOException { + Path p = Paths.get("../tests/data/gor/dbsnp_test.gor"); + Files.copy(p, workDirPath.resolve("dbsnp.gor")); + + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -vlink dbsnp3.gor", "-gorroot", workDirPath.toString()); + LinkFile linkFile = LinkFile.load(new FileSource(workDirPath.resolve("dbsnp3.gor.link").toString())); + Assert.assertEquals(1, linkFile.getEntriesCount()); + + // Test with same file. + TestUtils.runGorPipe("gor dbsnp.gor | write dbsnp2.gor -vlink dbsnp3.gor", "-gorroot", workDirPath.toString()); + linkFile = LinkFile.load(new FileSource(workDirPath.resolve("dbsnp3.gor.link").toString())); + Assert.assertEquals(2, linkFile.getEntriesCount()); + + // Test with different file + Assert.assertThrows( "Overwriting link with same path, throws exception", + GorSystemException.class, + () -> TestUtils.runGorPipe("gor dbsnp.gor | top 1 | write dbsnp2.gor -vlink dbsnp3.gor", + "-gorroot", workDirPath.toString())); + } + @Test public void testTxtWriteServer() throws IOException { Path p = Paths.get("../tests/data/nor/simple.nor"); diff --git a/gortools/src/test/java/gorsat/UTestGorWriteExplicit.java b/gortools/src/test/java/gorsat/UTestGorWriteExplicit.java index 21ff4b23..4da6ba7d 100644 --- a/gortools/src/test/java/gorsat/UTestGorWriteExplicit.java +++ b/gortools/src/test/java/gorsat/UTestGorWriteExplicit.java @@ -1,6 +1,8 @@ package gorsat; +import org.gorpipe.gor.model.DriverBackedFileReader; import org.gorpipe.gor.table.dictionary.gor.GorDictionaryTableMeta; +import org.gorpipe.gor.util.DataUtil; import org.junit.*; import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.rules.TemporaryFolder; @@ -416,7 +418,7 @@ public void testPartGorWrite() throws IOException { Assert.assertEquals(WRONG_RES_PARTGOR, expected, result); var header = new GorDictionaryTableMeta(); - header.loadAndMergeMeta(folderpath.resolve(DEFAULT_FOLDER_DICTIONARY_NAME)); + header.loadAndMergeMeta(new DriverBackedFileReader(""), folderpath.resolve(DEFAULT_FOLDER_DICTIONARY_NAME).toString()); Assert.assertEquals("false", header.getProperty(GorDictionaryTableMeta.HEADER_LINE_FILTER_KEY)); partsize = 4; @@ -493,7 +495,7 @@ public void testPartGorPgorDictionaryHeader() throws IOException { public void testExplicitWrite() throws IOException { var query = "create a = gor ../tests/data/gor/genes.gor | top 1 | write "+ workDirPath.resolve("test.gor").toAbsolutePath() +"; gor [a] | group chrom -count"; var results = TestUtils.runGorPipe(query,"-cachedir",cachePath.toString()); - Assert.assertTrue(Files.walk(cachePath).filter(p -> p.toString().endsWith(".gor")).allMatch(p -> p.endsWith(".link"))); + Assert.assertTrue(Files.walk(cachePath).filter(p -> p.toString().endsWith(".gor")).allMatch(p -> DataUtil.isLink(p.toString()))); Assert.assertEquals(WRONG_RESULT, "Chrom\tbpStart\tbpStop\tallCount\nchr1\t0\t250000000\t1\n", results); } @@ -501,7 +503,7 @@ public void testExplicitWrite() throws IOException { public void testExplicitWriteFolder() throws IOException { var query = "create a = pgor ../tests/data/gor/genes.gor | top 1 | write "+ workDirPath.resolve("test.gord").toAbsolutePath() +"; gor [a] | group chrom -count"; var results = TestUtils.runGorPipe(query,"-cachedir",cachePath.toString()); - Assert.assertTrue(Files.walk(cachePath).filter(p -> p.toString().endsWith(".gord")).allMatch(p -> p.endsWith(".link"))); + Assert.assertTrue(Files.walk(cachePath).filter(p -> p.toString().endsWith(".gord")).allMatch(p -> DataUtil.isLink(p.toString()))); Assert.assertEquals(WRONG_RESULT, GENE_GROUP_CHROM_TOP1, results); String[] result = TestUtils.runGorPipe("nor -asdict " + workDirPath.resolve("test.gord")).split("\n"); diff --git a/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java b/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java index 063a8396..42802c86 100644 --- a/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java +++ b/gortools/src/test/java/gorsat/monitors/StatsMonitorTest.java @@ -87,7 +87,7 @@ public void testRowCountAndBytesCountForAddedWrite() { var outputOptions = new OutputOptions(false, false, true, false, false, GorIndexType.NONE, new String[0], new String[0], Option.empty(), Option.empty(), Deflater.BEST_SPEED, - Option.empty(), false, false, null, "", null, false, false); + Option.empty(), false, false, null, "", 0, null, false, false); forkWrite = new ForkWrite(-1, workDirPath.resolve("test.gor").toString(), pipe.getSession(), pipe.getHeader(), outputOptions); pipe.lastStep().$bar(forkWrite); diff --git a/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java b/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java index bd69f40b..1f2c5abe 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java +++ b/model/src/main/java/org/gorpipe/gor/driver/PluggableGorDriver.java @@ -26,6 +26,7 @@ import org.gorpipe.exceptions.GorException; import org.gorpipe.exceptions.GorResourceException; import org.gorpipe.exceptions.GorSystemException; +import org.gorpipe.gor.driver.linkfile.LinkFile; import org.gorpipe.gor.driver.meta.DataType; import org.gorpipe.gor.driver.meta.IndexableSourceReference; import org.gorpipe.gor.driver.meta.SourceReference; @@ -33,6 +34,7 @@ import org.gorpipe.gor.driver.providers.stream.FileCache; import org.gorpipe.gor.driver.providers.stream.StreamSourceIteratorFactory; import org.gorpipe.gor.driver.providers.stream.StreamSourceProvider; +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; import org.gorpipe.gor.model.FileReader; import org.gorpipe.gor.model.GenomicIterator; import org.gorpipe.gor.model.GenomicIteratorBase; @@ -222,7 +224,7 @@ public SourceType[] getSupportedSourceTypes() { private DataSource handleLinks(DataSource source) throws IOException { if (source.getDataType() == LINK) { if (source.exists()) { - var sourceRef = getSourceRef(source, readLink(source), null); + var sourceRef = getSourceRef(source, LinkFile.load((StreamSource)source).getEntryUrl(source.getSourceReference().queryTime), null); sourceRef.setLinkLastModified(source.getSourceMetadata().getLastModified()); source.close(); DataSource rawLinkSource = resolveDataSource(sourceRef); @@ -301,14 +303,6 @@ private DataSource recursiveLinkFolderFallBack(DataSource source, Path parent, P return source; } - @Override - public String readLink(DataSource source) throws IOException { - SourceReference sourceReference = source.getSourceReference(); - String linkSubPath = sourceReference.getLinkSubPath(); - String linkText = sourceTypeToSourceProvider.get(source.getSourceType()).readLink(source); - return linkSubPath != null ? linkText + linkSubPath : linkText; - } - @Override public GorDriverConfig config() { return config; diff --git a/model/src/main/java/org/gorpipe/gor/driver/SourceProvider.java b/model/src/main/java/org/gorpipe/gor/driver/SourceProvider.java index 84e98452..0a4e99dc 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/SourceProvider.java +++ b/model/src/main/java/org/gorpipe/gor/driver/SourceProvider.java @@ -62,14 +62,6 @@ default void setCache(FileCache cache) {} */ DataSource wrap(DataSource source) throws IOException; - /** - * Read contents of a link source. A link source contains a reference to - * another source - possible handled by another provider. - */ - default String readLink(DataSource source) throws IOException { - throw new UnsupportedEncodingException("Links unsupported"); - } - /** * Create a genomic iterator from a source. */ diff --git a/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFile.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFile.java new file mode 100644 index 00000000..c96b752e --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFile.java @@ -0,0 +1,262 @@ +package org.gorpipe.gor.driver.linkfile; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.util.concurrent.UncheckedExecutionException; +import org.gorpipe.exceptions.GorResourceException; +import org.gorpipe.gor.driver.meta.SourceReference; +import org.gorpipe.gor.driver.providers.stream.StreamUtils; +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; +import org.gorpipe.gor.model.FileReader; +import org.gorpipe.gor.table.util.PathUtils; +import org.gorpipe.gor.util.DataUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Class to work with link files, read, write and access metadata. + * + * Link file format, a valid nor format. Note, the required fields form the current link file format. + * + * ## VERSION= + * ## ENTRIES_COUNT_MAX= + * ## ENTRIES_AGE_MAX= + * # FILE\tTIMESTAMP\tMD5\tSERIAL + * source/var/var.gorz\t1734304890790\tABCDEAF13422\t1 + * source/var/var.gorz\t1734305124533\t334DEAF13422\t2 + * + * Notes: + * 1. No timestamp or serial is treated as 0 (older). + * 2. Entries are added to the bottom. + * 3. If entries have the same timestamp, the appearing later in the file is picked. + * + */ +public abstract class LinkFile { + + public static final int LINK_FILE_MAX_SIZE = 10000; + + private static final boolean USE_LINK_CACHE = Boolean.parseBoolean(System.getProperty("gor.driver.cache.link", "true")); + private static final Cache linkCache = Caffeine.newBuilder() + .maximumSize(10000) + .expireAfterWrite(2, TimeUnit.HOURS).build(); + + public static LinkFile load(StreamSource source) throws IOException { + var content = loadContentFromSource(source); + return load(source, content); + } + + public static LinkFile load(StreamSource source, String content) { + var meta = LinkFileMeta.createAndLoad(content); + + if ("1".equals(meta.getVersion())) { + return new LinkFileV1(source, meta, content); + } else { + return new LinkFileV0(source, meta, content); + } + } + + public static LinkFile load(StreamSource source, int linkVersion) throws IOException { + switch (linkVersion) { + case 0: + return new LinkFileV0(source); + case 1: + default: + return new LinkFileV1(source); + } + } + + public static String validateAndUpdateLinkFileName(String linkFilePath, int linkVersion) { + if (DataUtil.isLink(linkFilePath)) { + return linkFilePath; + } else { + //return linkVersion == 0 ? DataUtil.toLink(linkFilePath) : DataUtil.toVersionedLink(linkFilePath); + return DataUtil.toLink(linkFilePath); + } + } + + protected final StreamSource source; + protected final LinkFileMeta meta; + protected final List entries; // Entries sorted by time (oldest first) + + /** + * Create a new link file from source and content. + * + * @param source the source to create the link file from + * @param content the content of the link file, can be empty or null to create an empty link file. + */ + protected LinkFile(StreamSource source, String content) { + this(source, LinkFileMeta.createAndLoad(content), content); + } + + protected LinkFile(StreamSource source, LinkFileMeta meta, String content) { + this.source = source; + this.meta = meta; + this.entries = parseEntries(content); + } + + public LinkFileMeta getMeta() { + return meta; + } + + public String getPath() { + return source.getFullPath(); + } + + public String getEntryUrl(long timestamp) { + return getUrlFromEntry(getEntry(timestamp)); + } + + public String getLatestEntryUrl() { + return getUrlFromEntry(getLatestEntry()); + } + + private String getUrlFromEntry(LinkFileEntry entry) { + var linkUrl = entry != null ? entry.url() : null; + if (linkUrl != null && !PathUtils.isAbsolutePath(linkUrl) && this.source != null) { + // Allow relative links: + linkUrl = PathUtils.resolve(PathUtils.getParent(this.source.getFullPath()), linkUrl); + } + + // Handle link sub-path if needed. + SourceReference sourceReference = source.getSourceReference(); + if (sourceReference != null) { + String linkSubPath = sourceReference.getLinkSubPath(); + linkUrl = linkSubPath != null ? linkUrl + linkSubPath : linkUrl; + } + + return linkUrl; + } + + protected String getHeader() { + return meta.formatHeader(); + } + + List getEntries() { + return entries; + } + + public int getEntriesCount() { + return entries.size(); + } + + /** + * Get the entry that matches the timestamp. + * @param timestamp timestamp to match + * @return best match entry or null if no entries. + */ + LinkFileEntry getEntry(long timestamp) { + int index = entries.size() - 1; + while (index >= 0 && entries.get(index).timestamp() > timestamp) { + index--; + } + return index >= 0 ? entries.get(index) : null; + } + + /** + * Get the latest entry. + * @return the latest entry + */ + LinkFileEntry getLatestEntry() { + return entries != null && !entries.isEmpty() ? entries.get(entries.size() - 1) : null; + } + + public void setEntriesCountMax(int entriesCountMax) { + meta.setEntriesCountMax(entriesCountMax); + } + + public int getEntriesCountMax() { + return meta.getEntriesCountMax(); + } + + public void setEntriesAgeMax(int entriesAgeMax) { + meta.setEntriesAgeMax(entriesAgeMax); + } + + public long getEntriesAgeMax() { + return meta.getEntriesAgeMax(); + } + + public LinkFile appendEntry(String link, String md5) { + return appendEntry(link, md5, null); + } + + public abstract LinkFile appendEntry(String link, String md5, FileReader reader); + + public void save() { + try (OutputStream os = source.getOutputStream()) { + save(os); + } catch (IOException e) { + throw new GorResourceException("Could not save: " + source.getFullPath(), source.getFullPath(), e); + } + } + + private void save(OutputStream os) { + var content = new StringBuilder(getHeader()); + + if (!entries.isEmpty()) { + var currentTimestamp = System.currentTimeMillis(); + entries.stream() + .skip(Math.max(0, entries.size() - getEntriesCountMax())) + .filter(entry -> entry.timestamp() <= 0 || currentTimestamp - entry.timestamp() <= getEntriesAgeMax()) + .forEach(entry -> content.append(entry.format()).append("\n")); + } + try { + os.write(content.toString().getBytes()); + } catch (IOException e) { + throw new GorResourceException("Could not save: " + source.getFullPath(), source.getFullPath(), e); + } + } + + protected abstract List parseEntries(String content); + + + /** + * Load content from the source, if it exists. + * + * @param source the source to load from + * @return the content of the link file or null if it does not exist (empty indicates version 0 link file). + */ + protected static String loadContentFromSource(StreamSource source) throws IOException { + if (source == null || !source.exists()) { + return null; + } + + if (USE_LINK_CACHE) { + try { + return linkCache.get(source, (k) -> { + try { + return readLimitedLinkContent(k); + } catch (Exception e) { + throw new UncheckedExecutionException(e); + } + }); + } catch (UncheckedExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new IOException(e.getCause()); + } + } else { + return readLimitedLinkContent(source); + } + } + + private static String readLimitedLinkContent(StreamSource source) { + try (InputStream is = source.open()) { + var content = StreamUtils.readString(is, LINK_FILE_MAX_SIZE); + if (content.length() == LINK_FILE_MAX_SIZE) { + throw new GorResourceException(String.format("Link file '%s' too large (> %d bytes).", + source.getFullPath(), LINK_FILE_MAX_SIZE), source.getFullPath()); + } + return content; + } catch (IOException e) { + throw new GorResourceException("Failed to read link file: " + source.getFullPath(), source.getFullPath(), e); + } + } + + +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntry.java similarity index 58% rename from model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java rename to model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntry.java index 222af571..51b1cb9a 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntry.java +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntry.java @@ -1,6 +1,6 @@ -package org.gorpipe.gor.driver.utils; +package org.gorpipe.gor.driver.linkfile; -interface LinkFileEntry { +public interface LinkFileEntry { String format(); String url(); diff --git a/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV0.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV0.java new file mode 100644 index 00000000..0b31b1ce --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV0.java @@ -0,0 +1,45 @@ +package org.gorpipe.gor.driver.linkfile; + +import org.gorpipe.util.Strings; + +import java.util.ArrayList; +import java.util.List; + +/** + * Link file entry, old style only containing the URL. + * + * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. + */ +public record LinkFileEntryV0(String url) implements LinkFileEntry { + public static List parse(String content) { + if (Strings.isNullOrEmpty(content)) { + return new ArrayList<>(); + } + List list = new ArrayList<>(); + list.add(new LinkFileEntryV0(content)); + return list; + } + + public LinkFileEntryV0(String url) { + if (Strings.isNullOrBlank(url)) { + throw new IllegalArgumentException("URL cannot be null or empty"); + } + this.url = url.trim(); + } + + public String format() { + return url; + } + + public long timestamp() { + return 0; // No timestamp in V0 + } + + public String md5() { + return ""; // No md5 in V0 + } + + public int serial() { + return 0; // No serial in V0 + } +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV1.java similarity index 65% rename from model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java rename to model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV1.java index c85b554d..2133b351 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV1.java +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileEntryV1.java @@ -1,4 +1,4 @@ -package org.gorpipe.gor.driver.utils; +package org.gorpipe.gor.driver.linkfile; import org.gorpipe.exceptions.GorResourceException; import org.gorpipe.util.Strings; @@ -10,14 +10,37 @@ import java.util.stream.Collectors; /** - * Link file entry + * Link file entry, with url, timestamp, md5 and serial. + * + * * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. * @param timestamp timestamp of the link file entry. Optional. * @param md5 md5 of file or data the link points to. Optional. * @param serial incrementing serial number for the link file entry. Optional. */ -record LinkFileEntryV1(String url, long timestamp, String md5, int serial) implements LinkFileEntry { +public record LinkFileEntryV1(String url, long timestamp, String md5, int serial) implements LinkFileEntry { + + public LinkFileEntryV1(String url, long timestamp, String md5, int serial) { + if (Strings.isNullOrBlank(url)) { + throw new IllegalArgumentException("URL cannot be null or empty"); + } + if (timestamp < 0) { + throw new IllegalArgumentException("Timestamp cannot be negative"); + } + if (serial < 0) { + throw new IllegalArgumentException("Serial cannot be negative"); + } + + this.url = url.trim(); + this.timestamp = timestamp; + this.md5 = md5; + this.serial = serial; + } + public static List parse(String content) { + if (Strings.isNullOrEmpty(content)) { + return new ArrayList<>(); + } return Arrays.stream(content.split("\n")) .filter(l -> !l.startsWith("#")) .map(LinkFileEntryV1::parseLine) diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileMeta.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileMeta.java similarity index 51% rename from model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileMeta.java rename to model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileMeta.java index 90cf117e..1664683a 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileMeta.java +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileMeta.java @@ -1,32 +1,41 @@ -package org.gorpipe.gor.driver.utils; +package org.gorpipe.gor.driver.linkfile; +import org.apache.commons.lang3.StringUtils; import org.gorpipe.gor.model.BaseMeta; import org.gorpipe.gor.model.FileReader; +import org.gorpipe.util.Strings; public class LinkFileMeta extends BaseMeta { public static final String HEADER_ENTRIES_COUNT_MAX_KEY = "ENTRIES_COUNT_MAX"; public static final String HEADER_ENTRIES_AGE_MAX_KEY = "ENTRIES_AGE_MAX"; + public static final String[] DEFAULT_TABLE_HEADER = new String[] {"File", "Timestamp", "MD5", "Serial"}; + public static final int DEFAULT_ENTRIES_COUNT_MAX = 100; - public static final long DEFAULT_ENTRIES_AGE_MAX = 315360000000L; + public static final long DEFAULT_ENTRIES_AGE_MAX = Long.MAX_VALUE; - public static LinkFileMeta createAndLoad(FileReader fileReader, String metaPath) { + public static LinkFileMeta createAndLoad(String metaContent) { LinkFileMeta meta = new LinkFileMeta(); - meta.loadAndMergeMeta(fileReader, metaPath); + if (Strings.isNullOrEmpty(metaContent)) { + meta.loadAndMergeMeta(getDefaultMetaContent()); + } else { + meta.loadAndMergeMeta(metaContent); + } return meta; } - public static LinkFileMeta createAndLoad(String metaContent) { - if (metaContent == null) { - metaContent = String.format("## VERSION=1%n" + - "## ENTRIES_COUNT_MAX=%d%n" + - "## ENTRIES_AGE_MAX=%d%n" + - "# FILE\tTIMESTAMP\tMD5\tSERIAL%n", DEFAULT_ENTRIES_COUNT_MAX, DEFAULT_ENTRIES_AGE_MAX); - } + public LinkFileMeta() { + super(); + setFileHeader(DEFAULT_TABLE_HEADER); + saveHeaderLine = true; + } - LinkFileMeta meta = new LinkFileMeta(); - meta.loadAndMergeMeta(metaContent); - return meta; + @Override + protected void parseHeaderLine(String line) { + String columnsString = StringUtils.strip(line, "\n #"); + if (!columnsString.isBlank()) { + setFileHeader(columnsString.split("[\t,]", -1)); + } } public int getEntriesCountMax() { @@ -45,5 +54,10 @@ public void setEntriesAgeMax(int entriesAgeMax) { setProperty(HEADER_ENTRIES_AGE_MAX_KEY, String.valueOf(entriesAgeMax)); } - + public static String getDefaultMetaContent() { + return String.format(""" + ## SERIAL = 0 + ## VERSION = 1 + """); + } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV0.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV0.java new file mode 100644 index 00000000..d3cea533 --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV0.java @@ -0,0 +1,43 @@ +package org.gorpipe.gor.driver.linkfile; + +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; +import org.gorpipe.gor.model.FileReader; + +import java.io.IOException; +import java.util.List; + +/** + * Old link file format, version 0. + */ +public class LinkFileV0 extends LinkFile { + + /** + * Load from a source, if it exists, otherwise create an empty link file. + * + * @param source the source to load from + */ + public LinkFileV0(StreamSource source) throws IOException { + super(source, loadContentFromSource(source)); + } + + protected LinkFileV0(StreamSource source, LinkFileMeta meta, String content) { + super(source, meta, content); + } + + @Override + protected String getHeader() { + return ""; + } + + @Override + protected List parseEntries(String content) { + return LinkFileEntryV0.parse(content); + } + + @Override + public LinkFile appendEntry(String link, String md5, FileReader reader) { + entries.clear(); // V0 does not support multiple entries, so we clear the list + entries.add(new LinkFileEntryV0(link)); + return this; + } +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV1.java b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV1.java new file mode 100644 index 00000000..f3a36cdd --- /dev/null +++ b/model/src/main/java/org/gorpipe/gor/driver/linkfile/LinkFileV1.java @@ -0,0 +1,87 @@ +package org.gorpipe.gor.driver.linkfile; + +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; +import org.gorpipe.gor.model.BaseMeta; +import org.gorpipe.gor.model.FileReader; + +import java.io.IOException; +import java.util.List; + +/** + * Link file format, version 1. + */ +public class LinkFileV1 extends LinkFile { + + private static boolean allowOverwriteOfTargets + = Boolean.parseBoolean(System.getProperty("gor.link.versioned.allow.overwrite", "false")); + + /** + * Load from a source, if it exists, otherwise create an empty link file. + * + * @param source the source to load from + */ + public LinkFileV1(StreamSource source) throws IOException { + super(source, loadContentFromSource(source)); + checkDefaultMeta(); + } + + protected LinkFileV1(StreamSource source, LinkFileMeta meta, String content) { + super(source, meta, content); + checkDefaultMeta(); + } + + @Override + protected List parseEntries(String content) { + return LinkFileEntryV1.parse(content); + } + + @Override + public LinkFile appendEntry(String link, String md5, FileReader reader) { + var latestEntry = getLatestEntry(); + var entry = new LinkFileEntryV1(link, System.currentTimeMillis(), md5, latestEntry != null ? latestEntry.serial() + 1 : 1); + validateEntry(entry, reader); + entries.add(entry); + return this; + } + + /** + * Validate the entry to ensure it is of the correct type, format and does not violate integrity of the link file. + * @param entry the link file entry to validate + */ + private void validateEntry(LinkFileEntry entry, FileReader reader) { + if (!(entry instanceof LinkFileEntryV1)) { + throw new IllegalArgumentException("Invalid entry type: " + entry.getClass().getName()); + } + if (entry.url() == null || entry.url().isEmpty()) { + throw new IllegalArgumentException("Entry URL cannot be null or empty"); + } + if (!allowOverwriteOfTargets) { + for (LinkFileEntry existingEntry : entries) { + if (existingEntry.url().equals(entry.url()) && !canReuseEntryWithSameUrl(existingEntry, entry, reader)) { + throw new IllegalArgumentException("Duplicate entry URL: " + entry.url()); + } + } + } + } + + private boolean canReuseEntryWithSameUrl(LinkFileEntry oldEntry, LinkFileEntry newEntry, FileReader reader) { + // We can reuse an entry if it is they has the same underlying file, as if not the integrity of the + // versioned link file is violated. + + if ((oldEntry.md5() != null && newEntry.md5() != null)) { + // Use md5 if available. + return oldEntry.md5().equals(newEntry.md5()); + } else { + // The old entry timestamp should be newer than the old file (as the file existed before the entry was added). + // If the new file as timestamp larger the old entry timestamp, we assume it is a different underlying file. + return reader == null || oldEntry.timestamp() >= reader.resolveUrl(newEntry.url()).getSourceMetadata().getLastModified(); + } + } + + private void checkDefaultMeta() { + if (!meta.containsProperty(BaseMeta.HEADER_VERSION_KEY)) { + getMeta().loadAndMergeMeta(LinkFileMeta.getDefaultMetaContent()); + meta.setProperty(BaseMeta.HEADER_VERSION_KEY, "1"); + } + } +} diff --git a/model/src/main/java/org/gorpipe/gor/driver/meta/DataType.java b/model/src/main/java/org/gorpipe/gor/driver/meta/DataType.java index ab89e0c8..5d3c4d54 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/meta/DataType.java +++ b/model/src/main/java/org/gorpipe/gor/driver/meta/DataType.java @@ -54,7 +54,7 @@ public enum DataType { GORP(TABLE, ".gorp", true), GORQ(REPORT, ".gorq"), LINK(REFERENCE, ".link"), - LOCAL_LINK(REFERENCE, ".link.local"), + VERSIONED_LINK(REFERENCE, ".versioned.link"), CRAM(VARIANTS, ".cram", true), CRAI(INDEX, ".crai"), SPEC(VARIANTS, ".spec"), @@ -110,7 +110,9 @@ public static boolean isOfType(String file, DataType type) { } private static boolean isLinkToType(String file, DataType type) { - return PathUtils.stripTrailingSlash(file.trim()).toLowerCase().endsWith(type.suffix + DataType.LINK.suffix); + var base = PathUtils.stripTrailingSlash(file.trim()).toLowerCase(); + return base.endsWith(type.suffix + DataType.LINK.suffix) + || base.endsWith(type.suffix + DataType.VERSIONED_LINK.suffix); } public static boolean isOfTypeOrLinksToType(String file, DataType type) { diff --git a/model/src/main/java/org/gorpipe/gor/driver/meta/IndexableSourceReference.java b/model/src/main/java/org/gorpipe/gor/driver/meta/IndexableSourceReference.java index 8c9f82d1..7d6c3b59 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/meta/IndexableSourceReference.java +++ b/model/src/main/java/org/gorpipe/gor/driver/meta/IndexableSourceReference.java @@ -31,17 +31,13 @@ public class IndexableSourceReference extends SourceReference { private final String indexSource; private final String referenceSource; - public IndexableSourceReference(String url, String indexSource, String referenceSource, String securityContext, String commonRoot, ChromoLookup lookup) { - super(url, securityContext, commonRoot, lookup, null, false); + public IndexableSourceReference(String url, String indexSource, String referenceSource, String securityContext, String commonRoot, long queryTime, ChromoLookup lookup) { + super(url, securityContext, commonRoot, queryTime, lookup, null, false, true); this.indexSource = indexSource; this.referenceSource = referenceSource; } - public IndexableSourceReference(String url, IndexableSourceReference parentSourceReference) { - this(url, parentSourceReference, null); - } - public IndexableSourceReference(String url, IndexableSourceReference parentSourceReference, String linkSubPath) { super(url, parentSourceReference, linkSubPath); diff --git a/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReference.java b/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReference.java index d4ec7337..7737ceb5 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReference.java +++ b/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReference.java @@ -30,6 +30,7 @@ import org.gorpipe.exceptions.GorSystemException; import org.gorpipe.gor.model.ChromoLookup; import org.gorpipe.gor.model.DefaultChromoLookup; +import org.gorpipe.gor.util.DataUtil; import java.io.IOException; import java.util.Objects; @@ -43,7 +44,8 @@ public class SourceReference { public final String url; public final String securityContext; - public final String commonRoot; // TODO: This should be removed? + public final String commonRoot; + public final long queryTime; // Time in milliseconds since epoch to find version of the source. public final boolean writeSource; @JsonIgnore ChromoLookup lookup; @@ -53,13 +55,13 @@ public class SourceReference { private SourceReference parentSourceReference; private boolean isFallback; // Should be called shouldFallback or canFallback, or better yet should be moved to S3Shared. - // TODO: evaluate whether the securityContext, lookup and columns should actually be a part of this class. + // TODO: evaluate whether the securityContext, lookup, queryTime and commonRoot should actually be a part of this class. // - should the context come in at request time? // - should the chromo lookup be retrieved from somewhere instead of pushed into this object? // - common root and security context are not used in all driver types, shouldn't this rather be a hash map? // - should the context hash map be stored as a part of this class or should it enter the chain at some other point? - public SourceReference(String url, String securityContext, String commonRoot, ChromoLookup lookup, + public SourceReference(String url, String securityContext, String commonRoot, long queryTime, ChromoLookup lookup, String linkSubPath, boolean writeSource, boolean isFallback) { this.url = url; // Pick up default security context here - it's not propagated from GorOptions if this is a sub query. @@ -69,6 +71,7 @@ public SourceReference(String url, String securityContext, String commonRoot, Ch this.securityContext = securityContext; } this.commonRoot = commonRoot; + this.queryTime = queryTime; this.lookup = lookup != null ? lookup : new DefaultChromoLookup(); this.linkSubPath = linkSubPath; this.writeSource = writeSource; @@ -77,7 +80,7 @@ public SourceReference(String url, String securityContext, String commonRoot, Ch public SourceReference(String url, String securityContext, String commonRoot, ChromoLookup lookup, String linkSubPath, boolean writeSource) { - this(url, securityContext, commonRoot, lookup, linkSubPath, writeSource, true); + this(url, securityContext, commonRoot, System.currentTimeMillis(), lookup, linkSubPath, writeSource, true); } /** @@ -89,7 +92,7 @@ public SourceReference(String url) { /** * @param url url for the source. - * @param parentSourceReference parent source reference to copy unupplied context from. + * @param parentSourceReference parent source reference to copy unsupplied context from. */ public SourceReference(String url, SourceReference parentSourceReference) { this(url, parentSourceReference, null); @@ -109,8 +112,10 @@ public SourceReference(String url, SourceReference parentSourceReference, String */ public SourceReference(String url, SourceReference parentSourceReference, String linkSubPath, String securityContext) { this(url, securityContext, parentSourceReference.getCommonRoot(), + parentSourceReference.queryTime, parentSourceReference.getLookup(), linkSubPath, - parentSourceReference.isWriteSource()); + parentSourceReference.isWriteSource(), + true); if (this.parentSourceReference == null) { this.parentSourceReference = parentSourceReference; } @@ -118,8 +123,8 @@ public SourceReference(String url, SourceReference parentSourceReference, String @JsonCreator public SourceReference(@JsonProperty("url") String url, @JsonProperty("securityContext") String securityContext, - @JsonProperty("commonRoot") String commonRoot) { - this(url, securityContext, commonRoot, null, null, false); + @JsonProperty("commonRoot") String commonRoot, @JsonProperty("queryTime") long queryTime) { + this(url, securityContext, commonRoot, queryTime, null, null, false, true); } public String getUrl() { @@ -189,7 +194,7 @@ public boolean isFallback() { public SourceReference getTopSourceReference() { SourceReference top = this; while (top.getParentSourceReference() != null - && !top.getParentSourceReference().getUrl().endsWith(DataType.LINK.suffix)) { + && !DataUtil.isLink(top.getParentSourceReference().getUrl())) { top = top.getParentSourceReference(); } return top; @@ -240,48 +245,4 @@ public static SourceReference fromJson(String json) { throw new GorSystemException("Unable to convert JSON to SourceReference. Content:\n" + json, e); } } - - /** - * Builder for the SourceReference, use builder copy constructor to allow copying fields from parent SourceReference. - */ - public static class Builder { - private final String url; - private String securityContext; - private String commonRoot; - private ChromoLookup lookup; - private int[] columns; - private String linkSubPath; - - public Builder(String url) { - this.url = url; - } - - public Builder(String url, SourceReference parentSourceReference) { - this.url = url; - // Don't copy objects, we want to share the instance with the parent. - this.securityContext = parentSourceReference.securityContext; - this.commonRoot = parentSourceReference.commonRoot; - this.lookup = parentSourceReference.lookup; - this.linkSubPath = parentSourceReference.linkSubPath; - } - - public SourceReference build() { - return new SourceReference(url, securityContext, commonRoot, lookup, linkSubPath, false); - } - - public Builder securityContext(String securityContext) { - this.securityContext = securityContext; - return this; - } - - public Builder commonRoot(String commonRoot) { - this.commonRoot = commonRoot; - return this; - } - - public Builder lookup(ChromoLookup lookup) { - this.lookup = lookup; - return this; - } - } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReferenceBuilder.java b/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReferenceBuilder.java index 5c87f43d..2e03bbc3 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReferenceBuilder.java +++ b/model/src/main/java/org/gorpipe/gor/driver/meta/SourceReferenceBuilder.java @@ -31,9 +31,11 @@ public class SourceReferenceBuilder { private final String url; private String securityContext; private String commonRoot; + private long queryTime = System.currentTimeMillis(); private ChromoLookup lookup; private boolean writeSource = false; private boolean isFallBack = true; + private String linkSubPath; public SourceReferenceBuilder(String url) { this.url = url; @@ -44,13 +46,15 @@ public SourceReferenceBuilder(String url, SourceReference parentSourceReference) // Don't copy objects, we want to share the instance with the parent. this.securityContext = parentSourceReference.securityContext; this.commonRoot = parentSourceReference.commonRoot; + this.queryTime = parentSourceReference.queryTime; this.lookup = parentSourceReference.lookup; this.writeSource = parentSourceReference.writeSource; this.isFallBack = parentSourceReference.isFallback(); + this.linkSubPath = parentSourceReference.getLinkSubPath(); } public SourceReference build() { - return new SourceReference(url, securityContext, commonRoot, lookup, null, writeSource, isFallBack); + return new SourceReference(url, securityContext, commonRoot, queryTime, lookup, linkSubPath, writeSource, isFallBack); } public SourceReferenceBuilder securityContext(String securityContext) { @@ -63,6 +67,11 @@ public SourceReferenceBuilder commonRoot(String commonRoot) { return this; } + public SourceReferenceBuilder queryTime(long queryTime) { + this.queryTime = queryTime; + return this; + } + public SourceReferenceBuilder lookup(ChromoLookup lookup) { this.lookup = lookup; return this; @@ -77,4 +86,9 @@ public SourceReferenceBuilder isFallback(boolean isFallBack) { this.isFallBack = isFallBack; return this; } + + public SourceReferenceBuilder linkSubPath(String linkSubPath) { + this.linkSubPath = linkSubPath; + return this; + } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamSourceProvider.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamSourceProvider.java index 8b232b32..f333bdd8 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamSourceProvider.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamSourceProvider.java @@ -22,24 +22,16 @@ package org.gorpipe.gor.driver.providers.stream; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.util.concurrent.UncheckedExecutionException; import org.gorpipe.gor.driver.DataSource; import org.gorpipe.gor.driver.GorDriverConfig; import org.gorpipe.gor.driver.GorDriverFactory; import org.gorpipe.gor.driver.SourceProvider; -import org.gorpipe.gor.driver.meta.DataType; -import org.gorpipe.gor.driver.meta.FileNature; -import org.gorpipe.gor.driver.meta.IndexableSourceReference; -import org.gorpipe.gor.driver.meta.SourceReference; +import org.gorpipe.gor.driver.meta.*; import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; -import org.gorpipe.gor.driver.providers.stream.sources.file.FileSource; import org.gorpipe.gor.driver.providers.stream.sources.wrappers.CachedSourceWrapper; import org.gorpipe.gor.driver.providers.stream.sources.wrappers.ExtendedRangeWrapper; import org.gorpipe.gor.driver.providers.stream.sources.wrappers.FullRangeWrapper; import org.gorpipe.gor.driver.providers.stream.sources.wrappers.RetryStreamSourceWrapper; -import org.gorpipe.gor.driver.utils.LinkFile; import org.gorpipe.gor.driver.utils.RetryHandlerBase; import org.gorpipe.gor.model.FileReader; import org.gorpipe.gor.model.GenomicIterator; @@ -52,15 +44,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.TimeUnit; public abstract class StreamSourceProvider implements SourceProvider { - protected final Logger log = LoggerFactory.getLogger(getClass()); - private static final boolean USE_LINK_CACHE = Boolean.parseBoolean(System.getProperty("gor.driver.cache.link", "true")); - private static final Cache linkCache = Caffeine.newBuilder() - .maximumSize(10000) - .expireAfterWrite(2, TimeUnit.HOURS).build(); private final Map dataTypeToFactory = new HashMap<>(); private FileCache cache; @@ -105,43 +91,6 @@ private void register(StreamSourceIteratorFactory factory) { } } - /** - * Read link and resolve content. - * - * @return Path linked to - */ - @Override - public String readLink(DataSource source) throws IOException { - if (USE_LINK_CACHE) { - try { - return linkCache.get(source, (k) -> { - try { - return readLinkContent(k); - } catch (IOException e) { - throw new UncheckedExecutionException(e); - } - }); - } catch (UncheckedExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw new IOException(e.getCause()); - } - } else { - return readLinkContent(source); - } - } - - private String readLinkContent(DataSource source) throws IOException { - try { - return LinkFile.load((StreamSource)source).getLatestEntryUrl(); - } finally { - if (source instanceof FileSource) { //FileSource handling is a special case due to FileSource.close() implementation - source.close(); - } - } - } - /** * Wrap core data source with wrappers providing extended functionality */ @@ -215,7 +164,7 @@ public GenomicIterator createIterator(DataSource source) throws IOException { StreamSource idxSource; if (indexUrl != null) { - idxSource = (StreamSource) GorDriverFactory.fromConfig().getDataSource(new SourceReference(indexUrl)); + idxSource = (StreamSource) GorDriverFactory.fromConfig().getDataSource(new SourceReference(indexUrl, sourceRef)); } else { // Find from the driver idxSource = findIndexFileFromFileDriver(file, sourceRef); @@ -236,7 +185,7 @@ public GenomicIterator createIterator(DataSource source) throws IOException { StreamSource refSource = null; if (referenceUrl != null) { - refSource = (StreamSource) GorDriverFactory.fromConfig().getDataSource(new SourceReference(referenceUrl)); + refSource = (StreamSource) GorDriverFactory.fromConfig().getDataSource(new SourceReference(referenceUrl, sourceRef)); } file.setReferenceSource(refSource); diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamUtils.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamUtils.java index b647cd8b..a12f063e 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamUtils.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/StreamUtils.java @@ -118,7 +118,7 @@ public static long readRangeToStream(InputStream input, RequestRange range, Outp public static String readString(InputStream stream, int maxLength) throws IOException { byte[] buf = new byte[maxLength]; int read = readToBuffer(stream, buf, 0, maxLength); - return new String(buf, 0, read); + return read > 0 ? new String(buf, 0, read) : ""; } /** diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/http/HTTPSource.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/http/HTTPSource.java index f78a1dd2..eef7e660 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/http/HTTPSource.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/sources/http/HTTPSource.java @@ -158,7 +158,7 @@ public StreamSourceMetadata getSourceMetadata() { if (sourceMetadata == null && exists == null) { HttpURLConnection urlc = null; try { - urlc = createBaseUrlConnection(); + urlc = createBaseUrlConnection("HEAD"); log.debug("Reading source metadata from {} using HEAD", url); urlc.setRequestMethod("HEAD"); urlc.connect(); @@ -193,6 +193,9 @@ public StreamSourceMetadata getSourceMetadata() { } } catch (IOException e) { throw new GorResourceException("Error reading source metadata from " + url.toString(), url.toString(), e).retry(); + } catch (Exception e) { + // Catch any other exception and throw a GorResourceException + throw new GorResourceException("Error reading source metadata from " + url.toString(), url.toString(), e).retry(); } finally { if (urlc != null) urlc.disconnect(); @@ -257,8 +260,13 @@ public void close() { } private HttpURLConnection createBaseUrlConnection() throws IOException { + return createBaseUrlConnection("GET"); + } + + private HttpURLConnection createBaseUrlConnection(String requestMethod) throws IOException { HttpURLConnection http = (HttpURLConnection) url.openConnection(); http.setUseCaches(false); + http.setRequestMethod(requestMethod); http.setInstanceFollowRedirects(true); http.setDoInput(true); http.setDoOutput(false); diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java deleted file mode 100644 index 06e8d053..00000000 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFile.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.gorpipe.gor.driver.utils; - -import org.gorpipe.exceptions.GorResourceException; -import org.gorpipe.gor.driver.providers.stream.StreamUtils; -import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; -import org.gorpipe.gor.table.util.PathUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -/** - * Class to work with link files, read, write and access metadata. - * - * Link file format, a valid nor format. Note, the required fields form the current link file format. - * - * ## VERSION= - * ## ENTRIES_COUNT_MAX= - * ## ENTRIES_AGE_MAX= - * # FILE\tTIMESTAMP\tMD5\tSERIAL - * source/var/var.gorz\t1734304890790\tABCDEAF13422\t1 - * source/var/var.gorz\t1734305124533\t334DEAF13422\t2 - * - * Empty timestamp or serial are always considered as 0 (older). - */ -public class LinkFile { - - public static final int LINK_FILE_MAX_SIZE = 10000; - - // Defaults to creating versioned link files. - public static LinkFile create(StreamSource source, String content) { - return new LinkFile(source, content); - } - - public static LinkFile load(StreamSource source) { - return new LinkFile(source); - } - - private final StreamSource source; - private final LinkFileMeta meta; - private final List entries; // Entries sorted by time (oldest first) - - // Create new link file from content. - public LinkFile(StreamSource source, String content) { - this.source = source; - this.meta = LinkFileMeta.createAndLoad(content); - this.entries = parseEntries(content); - } - - // Load from source - public LinkFile(StreamSource source) { - this(source, loadContentFromSource(source)); - } - - public LinkFileMeta getMeta() { - return meta; - } - - public String getPath() { - return source.getFullPath(); - } - - public void appendEntry(String link, String md5) { - entries.add(new LinkFileEntryV1(link, System.currentTimeMillis(), md5, getLatestEntry().serial() + 1)); - } - - public String getEntryUrl(long timestamp) { - return getUrlFromEntry(getEntry(timestamp)); - } - - public String getLatestEntryUrl() { - return getUrlFromEntry(getLatestEntry()); - } - - private String getUrlFromEntry(LinkFileEntry entry) { - var linkUrl = entry != null ? entry.url() : null; - if (linkUrl != null && !PathUtils.isAbsolutePath(linkUrl) && this.source != null) { - // Allow relative links: - linkUrl = PathUtils.resolve(PathUtils.getParent(this.source.getFullPath()), linkUrl); - } - return linkUrl; - } - - List getEntries() { - return entries; - } - - /** - * Get the entry that matches the timestamp. - * @param timestamp timestamp to match - * @return best match entry or null if no entries. - */ - LinkFileEntry getEntry(long timestamp) { - int index = entries.size() - 1; - while (index >= 0 && entries.get(index).timestamp() > timestamp) { - index--; - } - return index >= 0 ? entries.get(index) : null; - } - - /** - * Get the latest entry. - * @return the latest entry - */ - LinkFileEntry getLatestEntry() { - return entries != null && !entries.isEmpty() ? entries.get(entries.size() - 1) : null; - } - - public void setEntriesCountMax(int entriesCountMax) { - meta.setEntriesCountMax(entriesCountMax); - } - - public int getEntriesCountMax() { - return meta.getEntriesCountMax(); - } - - public void setEntriesAgeMax(int entriesAgeMax) { - meta.setEntriesAgeMax(entriesAgeMax); - } - - public long getEntriesAgeMax() { - return meta.getEntriesAgeMax(); - } - - public void save(OutputStream os) { - var content = switch (getMeta().getVersion()) { - case "1" -> new StringBuilder(meta.formatHeader()); - default -> new StringBuilder(); - }; - - if (!entries.isEmpty()) { - var currentTimestamp = System.currentTimeMillis(); - entries.stream() - .skip(Math.max(0, entries.size() - getEntriesCountMax())) - .filter(entry -> entry.timestamp() <= 0 || entry.timestamp() + getEntriesAgeMax() >= currentTimestamp) - .forEach(entry -> content.append(entry.format()).append("\n")); - } - try { - os.write(content.toString().getBytes()); - } catch (IOException e) { - throw new GorResourceException("Could not save: " + source.getFullPath(), source.getFullPath(), e); - } - } - - private List parseEntries(String content) { - return switch (getMeta().getVersion()) { - case "1" -> LinkFileEntryV1.parse(content); - default -> List.of(LinkFileEntryV0.parse(content)); - }; - } - - - private static String loadContentFromSource(StreamSource source) { - try (InputStream is = source.open()) { - var content = StreamUtils.readString(is, LINK_FILE_MAX_SIZE); - if (content.length() == LINK_FILE_MAX_SIZE) { - throw new GorResourceException(String.format("Link file '%s' too large (> %d bytes).", - source.getFullPath(), LINK_FILE_MAX_SIZE), source.getFullPath()); - } - return content; - } catch (IOException e) { - throw new GorResourceException("Failed to read link file: " + source.getFullPath(), source.getFullPath(), e); - } - } - -} diff --git a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java b/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java deleted file mode 100644 index 7e3e10bf..00000000 --- a/model/src/main/java/org/gorpipe/gor/driver/utils/LinkFileEntryV0.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.gorpipe.gor.driver.utils; - -/** - * Link file entry - * @param url content of the link file, path, url or a gor/nor query. Can be have more than one line. - */ -record LinkFileEntryV0(String url) implements LinkFileEntry { - public static LinkFileEntryV0 parse(String content) { - return new LinkFileEntryV0(content.trim()); - } - - public String format() { - return url; - } - - public long timestamp() { - return 0; // No timestamp in V0 - } - - public String md5() { - return ""; // No md5 in V0 - } - - public int serial() { - return 0; // No serial in V0 - } -} diff --git a/model/src/main/java/org/gorpipe/gor/model/BaseMeta.java b/model/src/main/java/org/gorpipe/gor/model/BaseMeta.java index 7bdc9fa9..3c28eff2 100644 --- a/model/src/main/java/org/gorpipe/gor/model/BaseMeta.java +++ b/model/src/main/java/org/gorpipe/gor/model/BaseMeta.java @@ -306,10 +306,6 @@ protected List extractMetaReader(BufferedReader br) throws IOException { return metaLines; } - public void loadAndMergeMeta(Path metaPath) { - loadAndMergeMeta(new DriverBackedFileReader(""), metaPath.toString()); - } - public String getMetaPath() { return this.metaPathStr; } @@ -355,4 +351,9 @@ protected void saveAs(FileReader fileReader, String fileName) { throw new GorSystemException(String.format("Could not save meta file %s", fileName), ioe); } } + + @Override + public String toString() { + return formatHeader(); + } } diff --git a/model/src/main/java/org/gorpipe/gor/model/DbConnectionCache.java b/model/src/main/java/org/gorpipe/gor/model/DbConnectionCache.java index 7d9e2900..f70db082 100644 --- a/model/src/main/java/org/gorpipe/gor/model/DbConnectionCache.java +++ b/model/src/main/java/org/gorpipe/gor/model/DbConnectionCache.java @@ -21,6 +21,9 @@ public class DbConnectionCache { private static final Logger log = LoggerFactory.getLogger(DbConnectionCache.class); + /** + * Map of DbConnection objects, keyed by the source name in lower case, to make the name case insensitive. + */ private final ConcurrentHashMap mapSources = new ConcurrentHashMap<>(); public String defaultDbSource = "rda"; @@ -38,7 +41,7 @@ public DbConnectionCache(String defaultDbSource) { * @return The DbSource object */ public DbConnection lookup(String source) { - return mapSources.get(source); + return mapSources.get(source.toLowerCase()); } /** @@ -120,7 +123,7 @@ private void installDbSourceFromParts(String[] parts) throws ClassNotFoundExcept public void clearDbSources() { for (DbConnection src : mapSources.values()) { src.close(); - mapSources.remove(src.name); + mapSources.remove(src.name.toLowerCase()); } mapSources.clear(); } @@ -129,13 +132,14 @@ public void clearDbSources() { * @param source The source to install as available */ public void install(final DbConnection source) { - log.info("Installing DbSource with name {}, url {} and user {}", source.name, source.url, source.user); - if (mapSources.containsKey(source.name)) { - DbConnection existingSource = mapSources.get(source.name); - log.warn("Installing over an existing source with name {}, url {} and user {}", + log.info("Installing DbSource with name: {}, url: {} and user: {}", source.name, source.url, source.user); + String key = source.name.toLowerCase(); + if (mapSources.containsKey(key)) { + DbConnection existingSource = mapSources.get(key); + log.warn("Installing over an existing source with name: {}, url: {} and user: {}", existingSource.name, existingSource.url, existingSource.user); } - mapSources.put(source.name, source); + mapSources.put(key, source); } diff --git a/model/src/main/java/org/gorpipe/gor/model/DriverBackedFileReader.java b/model/src/main/java/org/gorpipe/gor/model/DriverBackedFileReader.java index 2dbb26bc..97c14f41 100644 --- a/model/src/main/java/org/gorpipe/gor/model/DriverBackedFileReader.java +++ b/model/src/main/java/org/gorpipe/gor/model/DriverBackedFileReader.java @@ -31,7 +31,6 @@ import org.gorpipe.gor.driver.GorDriverFactory; import org.gorpipe.gor.driver.PluggableGorDriver; import org.gorpipe.gor.driver.SourceProvider; -import org.gorpipe.gor.driver.adapters.PositionAwareInputStream; import org.gorpipe.gor.driver.adapters.StreamSourceRacFile; import org.gorpipe.gor.driver.meta.DataType; import org.gorpipe.gor.driver.meta.SourceReference; @@ -73,18 +72,24 @@ public class DriverBackedFileReader extends FileReader { private final String securityContext; protected final String commonRoot; + protected long queryTime; public DriverBackedFileReader(String securityContext) { - this(securityContext, null); + this(securityContext, null, System.currentTimeMillis()); } public DriverBackedFileReader(String securityContext, String commonRoot) { + this(securityContext, commonRoot, System.currentTimeMillis()); + } + + public DriverBackedFileReader(String securityContext, String commonRoot, long queryTime) { this.securityContext = securityContext; if ((commonRoot == null || commonRoot.length() < 1) && GorStandalone.isStandalone()) { this.commonRoot = PathUtils.markAsFolder(GorStandalone.getStandaloneRoot()); } else { this.commonRoot = StringUtil.isEmpty(commonRoot) ? DEFAULT_COMMON_ROOT : PathUtils.markAsFolder(commonRoot); } + this.queryTime = queryTime; } @Override @@ -92,10 +97,20 @@ public String getCommonRoot() { return commonRoot; } + @Override + public long getQueryTime() { + return queryTime; + } + + @Override + public void setQueryTime(long queryTime) { + this.queryTime = queryTime; + } + @Override public SourceReference createSourceReference(String url, boolean writeable) { url = convertUrl(url); - return new SourceReferenceBuilder(url).commonRoot(commonRoot).securityContext(securityContext).writeSource(writeable).build(); + return new SourceReferenceBuilder(url).commonRoot(getCommonRoot()).queryTime(getQueryTime()).securityContext(getSecurityContext()).writeSource(writeable).build(); } @Override @@ -127,19 +142,6 @@ public DataSource resolveDataSource(SourceReference sourceReference) throws IOEx return dataSource; } - @Override - public String readLink(String url) { - try (DataSource source = resolveDataSource(createSourceReference(url, false))) { - if (source == null) { - throw new GorResourceException("Could not read link, invalid uri", url, null); - } else { - return GorDriverFactory.fromConfig().readLink(source); - } - } catch (IOException e) { - throw new GorResourceException("Could not read link", url, e); - } - } - @Override public String getSecurityContext() { return securityContext; diff --git a/model/src/main/java/org/gorpipe/gor/model/DriverBackedSecureFileReader.java b/model/src/main/java/org/gorpipe/gor/model/DriverBackedSecureFileReader.java index fe146f10..ebe9eb5d 100644 --- a/model/src/main/java/org/gorpipe/gor/model/DriverBackedSecureFileReader.java +++ b/model/src/main/java/org/gorpipe/gor/model/DriverBackedSecureFileReader.java @@ -27,7 +27,6 @@ import org.gorpipe.gor.auth.GorAuthRoleMatcher; import org.gorpipe.gor.auth.SecurityPolicy; import org.gorpipe.exceptions.GorResourceException; -import org.gorpipe.exceptions.GorSystemException; import org.gorpipe.gor.driver.DataSource; import org.gorpipe.gor.table.util.PathUtils; import org.gorpipe.gor.util.DataUtil; @@ -38,10 +37,8 @@ import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.stream.Stream; /** * Gor server file reader. @@ -65,7 +62,12 @@ public class DriverBackedSecureFileReader extends DriverBackedFileReader { */ public DriverBackedSecureFileReader(String commonRoot, String securityContext, AccessControlContext accessControlContext) { - super(securityContext, commonRoot); + this(commonRoot, securityContext, accessControlContext, System.currentTimeMillis()); + } + + public DriverBackedSecureFileReader(String commonRoot, + String securityContext, AccessControlContext accessControlContext, long time) { + super(securityContext, commonRoot, time); this.accessControlContext = accessControlContext != null ? accessControlContext : AccessControlContext.builder().build(); } @@ -226,7 +228,7 @@ public void validateServerURIs(URI... uris) { @Override public DriverBackedFileReader unsecure() { if (unsecure == null) { - unsecure = new DriverBackedFileReader(getSecurityContext(), getCommonRoot()); + unsecure = new DriverBackedFileReader(getSecurityContext(), getCommonRoot(), getQueryTime()); } return unsecure; } diff --git a/model/src/main/java/org/gorpipe/gor/model/FileReader.java b/model/src/main/java/org/gorpipe/gor/model/FileReader.java index ac0d7dfd..10b625ac 100644 --- a/model/src/main/java/org/gorpipe/gor/model/FileReader.java +++ b/model/src/main/java/org/gorpipe/gor/model/FileReader.java @@ -27,7 +27,7 @@ import org.gorpipe.gor.driver.DataSource; import org.gorpipe.gor.driver.meta.SourceReference; import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; -import org.gorpipe.gor.driver.utils.LinkFile; +import org.gorpipe.gor.driver.linkfile.LinkFile; import org.gorpipe.gor.table.util.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,15 +329,12 @@ public DataSource resolveUrl(SourceReference sourceReference) { */ public abstract DataSource resolveDataSource(SourceReference sourceReference) throws IOException ; - /** - * Read link file and return the underlying file, following all link files. - * @param url path to the link file - * @return the underlying file - */ - public abstract String readLink(String url); - public abstract String getCommonRoot(); + public abstract long getQueryTime(); + + public abstract void setQueryTime(long queryTime); + public abstract SourceReference createSourceReference(String url, boolean writeable); // @@ -345,11 +342,8 @@ public void writeLinkIfNeeded(String url) throws IOException { if (Strings.isNullOrEmpty(url)) return; DataSource dataSource = resolveUrl(url, true); if (dataSource.forceLink()) { - try (OutputStream os = getOutputStream(dataSource.getProjectLinkFile())) { - // Create link file with the content of the data source. - LinkFile.create((StreamSource) dataSource, dataSource.getProjectLinkFileContent()) - .save(os); - } + DataSource linkDataSource = resolveUrl(dataSource.getProjectLinkFile(), true); + LinkFile.load((StreamSource) linkDataSource, dataSource.getProjectLinkFileContent()).save(); } } diff --git a/model/src/main/java/org/gorpipe/gor/model/GorMeta.java b/model/src/main/java/org/gorpipe/gor/model/GorMeta.java index 31912db3..513b8a81 100644 --- a/model/src/main/java/org/gorpipe/gor/model/GorMeta.java +++ b/model/src/main/java/org/gorpipe/gor/model/GorMeta.java @@ -18,12 +18,6 @@ public static GorMeta createAndLoad(FileReader fileReader, String metaPath) { return meta; } - public static GorMeta createAndLoad(Path metaPath) { - GorMeta meta = new GorMeta(); - meta.loadAndMergeMeta(metaPath); - return meta; - } - public GorMeta() { super(); } diff --git a/model/src/main/java/org/gorpipe/gor/model/GorOptions.java b/model/src/main/java/org/gorpipe/gor/model/GorOptions.java index c3c33708..a8d80fb0 100644 --- a/model/src/main/java/org/gorpipe/gor/model/GorOptions.java +++ b/model/src/main/java/org/gorpipe/gor/model/GorOptions.java @@ -57,6 +57,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Instant; import java.util.*; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -290,7 +291,7 @@ public static GorOptions createGorOptions(String query, org.gorpipe.gor.model.Fi public static GorOptions createGorOptions(GorContext context, String[] arguments) { Tuple2 result = CommandParseUtilities.validateCommandArguments(arguments, new CommandArguments("-fs -nf -Y -P -stdin -n -nowithin", - "-f -ff -m -M -p -sc -ref -idx -Z -H -z -X -s -b -dict -parts -seek", -1, -1, false)); + "-f -ff -m -M -p -sc -ref -idx -Z -H -z -X -s -b -dict -parts -seek -time", -1, -1, false)); String[] inputArguments = result._1; String[] illegalArguments = result._2; @@ -329,6 +330,7 @@ public GorOptions(GorContext context, String[] iargs, String[] options) { String monId = CommandParseUtilities.stringValueOfOptionWithDefault(options, "-H", null); String securityKey = CommandParseUtilities.replaceSingleQuotes(CommandParseUtilities.stringValueOfOptionWithDefault(options, "-Z", null)); int tmpParallelBlocks = CommandParseUtilities.intValueOfOptionWithDefault(options, "-z", 0); + long queryTime = CommandParseUtilities.epochValueOfOptionWithDefault(options, "-time", -1L); this.columnTags = tagsFromOptions(session, options); hasTagFiltering = this.columnTags != null; @@ -356,6 +358,9 @@ public GorOptions(GorContext context, String[] iargs, String[] options) { this.parallelBlocks = tmpParallelBlocks; this.monitor = monId != null ? ResourceMonitor.find(monId) : null; + if (queryTime >= 0) { + this.context.getSession().getProjectContext().getFileReader().setQueryTime(queryTime); + } if (CommandParseUtilities.hasOption(options, "-p")) { // Need to get the last range if multiple ranges are given diff --git a/model/src/main/java/org/gorpipe/gor/model/GorParallelQueryHandler.java b/model/src/main/java/org/gorpipe/gor/model/GorParallelQueryHandler.java index bafda3fe..00e8a691 100644 --- a/model/src/main/java/org/gorpipe/gor/model/GorParallelQueryHandler.java +++ b/model/src/main/java/org/gorpipe/gor/model/GorParallelQueryHandler.java @@ -49,13 +49,6 @@ public interface GorParallelQueryHandler { */ void setForce(boolean force); - /** - * Submission time (if available) in seconds from Jan 1st 1970 (i.e. like System.currentTimeMillis()/1000) - * - * @param time Time in seconds (From Jan 1st 1970) - */ - void setQueryTime(Long time); - /** * Get time spent waiting for jobs * diff --git a/model/src/main/java/org/gorpipe/gor/model/SourceRef.java b/model/src/main/java/org/gorpipe/gor/model/SourceRef.java index 4033ea83..7d105e00 100644 --- a/model/src/main/java/org/gorpipe/gor/model/SourceRef.java +++ b/model/src/main/java/org/gorpipe/gor/model/SourceRef.java @@ -381,7 +381,8 @@ private static GenomicIterator iterateFile(String file, String index, String ref try { var isMem = file.startsWith("mem:"); if (!isMem && GorDriverFactory.fromConfig().config().enabled()) { - SourceReference sourceReference = new IndexableSourceReference(file, index, reference, securityContext, commonRoot, lookup); + SourceReference sourceReference = new IndexableSourceReference(file, index, reference, securityContext, + commonRoot, session != null ? session.getProjectContext().getFileReader().getQueryTime() : Long.MAX_VALUE, lookup); GenomicIterator newIt; if (session != null) { // Use the datasource if possible. diff --git a/model/src/main/java/org/gorpipe/gor/session/ProjectContext.java b/model/src/main/java/org/gorpipe/gor/session/ProjectContext.java index 9c435dc2..79a9c832 100644 --- a/model/src/main/java/org/gorpipe/gor/session/ProjectContext.java +++ b/model/src/main/java/org/gorpipe/gor/session/ProjectContext.java @@ -47,7 +47,7 @@ public class ProjectContext { - public static final FileReader DEFAULT_READER = new DriverBackedFileReader(System.getProperty("gor.security.context", ""), "."); + public static final FileReader DEFAULT_READER = new DriverBackedFileReader(System.getProperty("gor.security.context", ""), ".", Long.MAX_VALUE); public static final String DEFAULT_CACHE_DIR = System.getProperty("java.io.tmpdir"); private String aliasFile; @@ -155,7 +155,7 @@ public ProjectContext build() { projectContext.fileReader = fileReader; projectContext.systemFileReader = systemFileReader != null ? systemFileReader - : new DriverBackedFileReader(fileReader.getSecurityContext(), fileReader.getCommonRoot()); + : new DriverBackedFileReader(fileReader.getSecurityContext(), fileReader.getCommonRoot(), fileReader.getQueryTime() ); projectContext.logDirectory = logDirectory; projectContext.projectName = projectName; projectContext.queryEvaluator = queryEvaluator; diff --git a/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java b/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java index c05b912b..b76ed122 100644 --- a/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java +++ b/model/src/main/java/org/gorpipe/gor/table/dictionary/DictionaryTableReader.java @@ -218,13 +218,13 @@ public String getSignature(Boolean useCommonRoot, String commonRoot, String... t for (T line : matchingLines) { fingerPrintString.append(getContentReal(line)); fingerPrintString.append((byte) '&'); - fingerPrintString.append(getLastModifiedTime(getContentReal(line), getSecurityContext(), commonRoot)); + fingerPrintString.append(getLastModifiedTime(getContentReal(line), getSecurityContext(), commonRoot, fileReader.getQueryTime())); } } else { fingerPrintString = new ByteTextBuilder(300); fingerPrintString.append(getPath()); fingerPrintString.append((byte) '&'); - fingerPrintString.append(getLastModifiedTime(getPath(), getSecurityContext(), commonRoot)); + fingerPrintString.append(getLastModifiedTime(getPath(), getSecurityContext(), commonRoot, fileReader.getQueryTime())); } return fingerPrintString.md5(); @@ -251,10 +251,10 @@ public long getLastModified(Boolean useCommonRoot, String commonRoot, String... // Empty tags here means no tags so replace with null. List matchingLines = filter().tags(tags).get(); for (T line : matchingLines) { - lastModified = Math.max(lastModified, getLastModifiedTime(getContentReal(line), getSecurityContext(), commonRoot)); + lastModified = Math.max(lastModified, getLastModifiedTime(getContentReal(line), getSecurityContext(), commonRoot, fileReader.getQueryTime())); } } else { - lastModified = Math.max(lastModified, getLastModifiedTime(getPath().toString(), getSecurityContext(), commonRoot)); + lastModified = Math.max(lastModified, getLastModifiedTime(getPath().toString(), getSecurityContext(), commonRoot, fileReader.getQueryTime())); } return lastModified; diff --git a/model/src/main/java/org/gorpipe/gor/table/livecycle/TableInfoBase.java b/model/src/main/java/org/gorpipe/gor/table/livecycle/TableInfoBase.java index 7fbe47d1..815f1374 100644 --- a/model/src/main/java/org/gorpipe/gor/table/livecycle/TableInfoBase.java +++ b/model/src/main/java/org/gorpipe/gor/table/livecycle/TableInfoBase.java @@ -280,15 +280,22 @@ public void updateValidateHeader(String file) { } else { // Validate the header. if (this.header.getColumns().length != lineHeader.getColumns().length) { - throw new GorDataException(String.format("Can not update dictionary %s. The number of columns does not match (dict: %d, line: %d)", - getPath(), this.header.getColumns().length, lineHeader.getColumns().length), -1, lineHeader.toString(), header.toString()); + throw new GorDataException( + String.format("Can not update dictionary %s. The number of columns does not match (dict: %d (%s), line: %d (%s))", + getPath(), + this.header.getColumns().length, String.join(",", this.header.getColumns()), + lineHeader.getColumns().length, String.join(",", lineHeader.getColumns())), + -1, lineHeader.toString(), header.toString()); } if (FORCE_SAME_COLUMN_NAMES && this.header.isProper() && lineHeader.isProper() && !String.join(",", this.header.getColumns()).equals(String.join(",", lineHeader.getColumns()))) { - throw new GorDataException(String.format("Can not update dictionary %s. The columns do not match (dict: %s vs line: %s)", - getPath(), String.join(",", this.header.getColumns()), String.join(",", - lineHeader.getColumns())), -1, lineHeader.toString(), header.toString() ); + throw new GorDataException( + String.format("Can not update dictionary %s. The columns do not match (dict: %s vs line: %s)", + getPath(), + String.join(",", this.header.getColumns()), + String.join(",", lineHeader.getColumns())), + -1, lineHeader.toString(), header.toString() ); } } } diff --git a/model/src/main/java/org/gorpipe/gor/table/util/PathUtils.java b/model/src/main/java/org/gorpipe/gor/table/util/PathUtils.java index 63223dcd..bc196e84 100644 --- a/model/src/main/java/org/gorpipe/gor/table/util/PathUtils.java +++ b/model/src/main/java/org/gorpipe/gor/table/util/PathUtils.java @@ -314,9 +314,10 @@ public static boolean isLocal(String path) { return !path.contains(":") || path.startsWith("file:"); } - public static long getLastModifiedTime(String fileName, String securityContext, String commonRoot) throws IOException { + public static long getLastModifiedTime(String fileName, String securityContext, String commonRoot, long queryTime) throws IOException { //TODO: This method should really take in SourceReference or better yet be removed and replaced with calls to datasource.getUniqueId - DataSource ds = GorDriverFactory.fromConfig().getDataSource(new SourceReferenceBuilder(fileName).securityContext(securityContext).commonRoot(commonRoot).build()); + DataSource ds = GorDriverFactory.fromConfig().getDataSource( + new SourceReferenceBuilder(fileName).securityContext(securityContext).commonRoot(commonRoot).queryTime(queryTime).build()); if (ds != null) { return ds.getSourceMetadata().getLastModified(); } else { diff --git a/model/src/main/java/org/gorpipe/gor/util/DataUtil.java b/model/src/main/java/org/gorpipe/gor/util/DataUtil.java index a6d33cbf..abbffb24 100644 --- a/model/src/main/java/org/gorpipe/gor/util/DataUtil.java +++ b/model/src/main/java/org/gorpipe/gor/util/DataUtil.java @@ -3,6 +3,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.gorpipe.exceptions.GorDataException; import org.gorpipe.gor.driver.meta.DataType; +import org.gorpipe.gor.table.util.PathUtils; import java.nio.file.Files; @@ -124,6 +125,14 @@ public static String toLinkFile(String name, DataType type) { return name + type.suffix + DataType.LINK.suffix; } + public static String toLink(String path) { + return PathUtils.stripTrailingSlash(path) + DataType.LINK.suffix; + } + + public static String toVersionedLink(String path) { + return PathUtils.stripTrailingSlash(path) + DataType.VERSIONED_LINK.suffix; + } + public static String toTempTempFile(String file) { var type = DataType.fromFileName(file); diff --git a/model/src/main/scala/gorsat/Commands/CommandParseUtilities.scala b/model/src/main/scala/gorsat/Commands/CommandParseUtilities.scala index a703699a..2822c5dc 100644 --- a/model/src/main/scala/gorsat/Commands/CommandParseUtilities.scala +++ b/model/src/main/scala/gorsat/Commands/CommandParseUtilities.scala @@ -236,7 +236,7 @@ object CommandParseUtilities { val optionValue: String = stringValueOfOption(args, name) var value: Long = -1 try { - value = optionValue.toInt + value = optionValue.toLong } catch { case _: Throwable => throw new GorParsingException(s"Value $optionValue supplied with option $name is not a valid number", name, optionValue) } @@ -259,6 +259,33 @@ object CommandParseUtilities { } } + def epochValueOfOption(args: Array[String], name: String): Long = { + var optionValue: String = stringValueOfOption(args, name) + try { + if (optionValue.contains("-")) { + if (!optionValue.contains("T")) { + optionValue = optionValue + "T00:00:00Z" + } else if (!optionValue.contains("Z")) { + optionValue = optionValue + "Z" + } + java.time.Instant.parse(optionValue).toEpochMilli + } else { + optionValue.toLong + } + } catch { + case e: Throwable => throw new GorParsingException(s"Value $optionValue supplied with option $name is not a valid iso date/epoch", e) + } + } + + def epochValueOfOptionWithDefault(args: Array[String], name: String, defaultValue: Long = 0): Long = { + if (hasOption(args, name)) { + epochValueOfOption(args, name) + } else { + defaultValue + } + } + + def doubleValueOfOption(args: Array[String], name: String): Double = { val optionValue: String = stringValueOfOption(args, name) var value: Double = -1 diff --git a/model/src/main/scala/gorsat/gorsatDynIterator.scala b/model/src/main/scala/gorsat/gorsatDynIterator.scala index 6e654230..c3d49fd8 100644 --- a/model/src/main/scala/gorsat/gorsatDynIterator.scala +++ b/model/src/main/scala/gorsat/gorsatDynIterator.scala @@ -187,7 +187,21 @@ class DynamicRowSource(iteratorCommand : String, context: GorContext, fixHeader } i += 1 } - (if( prefix.nonEmpty ) prefix+";" else "") + (if( header ) pipeSteps.map(s => { if(s.toLowerCase.contains("sdl")) s else CommandParseUtilities.quoteSafeReplace(s,"|","| top 0 |") }).mkString("| top 0 |")+"| top 0" else pipeSteps.mkString("|")) + (if (prefix.nonEmpty) prefix+";" else "") + ( + if (header) { + pipeSteps.map(s => { + if (s.toLowerCase.contains("sdl")) { + s + } else if (s.toLowerCase.startsWith("write ")) { + // Skip write commands when getting header + "skip -1" + } else { + CommandParseUtilities.quoteSafeReplace(s,"|","| top 0 |") + } + }).mkString("| top 0 |")+"| top 0" + } else { + pipeSteps.mkString("|") + }) } def setPositionWithoutChrLimits(seekChr: String, seekPos : Int): Unit = { diff --git a/model/src/test/java/org/gorpipe/gor/driver/UTestGorDriver.java b/model/src/test/java/org/gorpipe/gor/driver/UTestGorDriver.java index f3eae5b1..c4e065b9 100644 --- a/model/src/test/java/org/gorpipe/gor/driver/UTestGorDriver.java +++ b/model/src/test/java/org/gorpipe/gor/driver/UTestGorDriver.java @@ -196,7 +196,7 @@ public void testBrokenLink() throws IOException { @Test public void testIndexedSourceReference() throws IOException { - DataSource source = gorDriver.getDataSource(new IndexableSourceReference(DataUtil.toFile("../tests/data/gor/genes", DataType.GOR), "foo", "bar", null, null, null)); + DataSource source = gorDriver.getDataSource(new IndexableSourceReference(DataUtil.toFile("../tests/data/gor/genes", DataType.GOR), "foo", "bar", null, null, 0, null)); Assert.assertTrue("source should exists", source.exists()); Assert.assertTrue(source.getSourceReference() instanceof IndexableSourceReference); @@ -219,7 +219,7 @@ public void testIndexedSourceReferenceWithLinkFile() throws IOException { FileUtils.writeStringToFile(linkFile, dataFile.getAbsolutePath(), Charset.defaultCharset()); // Create indexable source references - DataSource source = gorDriver.getDataSource(new IndexableSourceReference(linkFile.getAbsolutePath(), "foo", "bar", null, null, null)); + DataSource source = gorDriver.getDataSource(new IndexableSourceReference(linkFile.getAbsolutePath(), "foo", "bar", null, null, 0, null)); Assert.assertTrue("source should exists", source.exists()); // test that the data source source reference is of the correct type and includes the index and reference file @@ -235,7 +235,7 @@ public void testIndexedSourceReferenceWithLinkFileAndNoLinkExtension() throws IO FileUtils.writeStringToFile(linkFile, dataFile.getAbsolutePath(), Charset.defaultCharset()); // Create indexable source references - DataSource source = gorDriver.getDataSource(new IndexableSourceReference(linkFile.getAbsolutePath().replace(DataType.LINK.suffix, ""), "foo", "bar", null, null, null)); + DataSource source = gorDriver.getDataSource(new IndexableSourceReference(linkFile.getAbsolutePath().replace(DataType.LINK.suffix, ""), "foo", "bar", null, null, 0, null)); Assert.assertTrue("source should exists", source.exists()); // test that the data source source reference is of the correct type and includes the index and reference file diff --git a/model/src/test/java/org/gorpipe/gor/driver/linkfile/LinkFileTest.java b/model/src/test/java/org/gorpipe/gor/driver/linkfile/LinkFileTest.java new file mode 100644 index 00000000..6999013f --- /dev/null +++ b/model/src/test/java/org/gorpipe/gor/driver/linkfile/LinkFileTest.java @@ -0,0 +1,155 @@ +package org.gorpipe.gor.driver.linkfile; + +import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; +import org.gorpipe.gor.driver.providers.stream.sources.file.FileSource; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class LinkFileTest { + + @Rule + public TemporaryFolder workDir = new TemporaryFolder(); + + private StreamSource mockSource; + private final String v1LinkFileContent = """ + ## SERIAL = 0 + ## VERSION = 1 + #FILE\tTIMESTAMP\tMD5\tSERIAL + source/v1/ver1.gorz\t1734304890790\tABCDEAF13422\t1 + source/v1/ver2.gorz\t1734305124533\t334DEAF13422\t2 + """; + private final String v0LinkFileContent = "source/v0/verx.gorz\n"; + private final String simpleFile = "source/y.gorz"; + protected Path workPath; + + @Before + public void setUp() { + workPath = workDir.getRoot().toPath().toAbsolutePath(); + mockSource = mock(StreamSource.class); + + } + + @Test + public void testCreateLinkFile() { + LinkFile linkFile = LinkFile.load(mockSource, v1LinkFileContent); + assertNotNull(linkFile); + assertEquals(2, linkFile.getEntries().size()); + assertEquals(100, linkFile.getEntriesCountMax()); + } + + @Test + public void testLoadLinkFile() throws IOException { + when(mockSource.exists()).thenReturn(true); + when(mockSource.open()).thenReturn(new ByteArrayInputStream(v1LinkFileContent.getBytes())); + LinkFile linkFile = LinkFile.load(mockSource); + assertNotNull(linkFile); + assertEquals(2, linkFile.getEntries().size()); + assertEquals(100, linkFile.getEntriesCountMax()); + } + + @Test + public void testAppendEntry() { + LinkFile linkFile = LinkFile.load(mockSource, v1LinkFileContent); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + assertEquals(3, linkFile.getEntries().size()); + } + + @Test + public void testGetLatestPath() { + when(mockSource.getFullPath()).thenReturn("/mnt/csa/projects/test/x.link"); + LinkFile linkFile = LinkFile.load(mockSource, v1LinkFileContent); + assertEquals("/mnt/csa/projects/test/source/v1/ver2.gorz", linkFile.getLatestEntryUrl()); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + assertEquals("/mnt/csa/projects/test/" + simpleFile, linkFile.getLatestEntryUrl()); + } + + @Test + public void testGetTimedPath() { + when(mockSource.getFullPath()).thenReturn("/mnt/csa/projects/test/x.link"); + LinkFile linkFile = LinkFile.load(mockSource, v1LinkFileContent); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + + assertEquals(null, linkFile.getEntryUrl(1734304890790L - 100)); + assertEquals("/mnt/csa/projects/test/source/v1/ver1.gorz", linkFile.getEntryUrl(1734304890790L + 100)); + assertEquals("/mnt/csa/projects/test/source/v1/ver2.gorz", linkFile.getEntryUrl(1734305124533L)); + assertEquals("/mnt/csa/projects/test/source/v1/ver2.gorz", linkFile.getEntryUrl(1734305124533L + 100)); + assertEquals("/mnt/csa/projects/test/" + simpleFile, linkFile.getEntryUrl(System.currentTimeMillis())); + } + + @Test + public void testSaveNewV1LinkFile() throws IOException { + var linkPath = workPath.resolve("test.link"); + LinkFile linkFile = new LinkFileV1(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertTrue(savedContent.contains("## VERSION = 1")); + assertTrue(savedContent.contains(simpleFile)); + } + + @Test + public void testSaveNewV0LinkFile() throws IOException { + var linkPath = workPath.resolve("test.link"); + LinkFile linkFile = new LinkFileV0(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertEquals(simpleFile, savedContent.trim()); + } + + @Test + public void testSaveLinkFileV1ToV1() throws IOException { + var linkPath = workPath.resolve("test.link"); + Files.writeString(linkPath, v1LinkFileContent); + LinkFile linkFile = LinkFile.load(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertTrue(savedContent.startsWith(v1LinkFileContent)); + assertTrue(savedContent.contains(simpleFile)); + } + + @Test + public void testSaveLinkFileV0ToV0() throws IOException { + var linkPath = workPath.resolve("test.link"); + Files.writeString(linkPath, "a/b/c.gorz"); + LinkFile linkFile = LinkFile.load(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertEquals(simpleFile, savedContent.trim()); + } + + @Test + public void testSaveLinkFileV0ToV1() throws IOException { + var linkPath = workPath.resolve("test.link"); + Files.writeString(linkPath, "a/b/c.gorz"); + LinkFile linkFile = new LinkFileV1(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertTrue(savedContent.contains("## VERSION = 1")); + assertTrue(savedContent.contains(simpleFile)); + } + + @Test + public void testSaveLinkFileV1ToV0() throws IOException { + var linkPath = workPath.resolve("test.link"); + Files.writeString(linkPath, v1LinkFileContent); + LinkFile linkFile = new LinkFileV0(new FileSource(linkPath.toString())); + linkFile.appendEntry(simpleFile, "NEWMD5SUM"); + linkFile.save(); + String savedContent = Files.readString(linkPath); + assertEquals(simpleFile, savedContent.trim()); + } +} diff --git a/model/src/test/java/org/gorpipe/gor/driver/linkfile/UTestTimeTravel.java b/model/src/test/java/org/gorpipe/gor/driver/linkfile/UTestTimeTravel.java new file mode 100644 index 00000000..190c7b8a --- /dev/null +++ b/model/src/test/java/org/gorpipe/gor/driver/linkfile/UTestTimeTravel.java @@ -0,0 +1,118 @@ +package org.gorpipe.gor.driver.linkfile; + +import gorsat.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.Assert.assertEquals; + +public class UTestTimeTravel { + + @Rule + public TemporaryFolder workDir = new TemporaryFolder(); + + protected Path workPath; + private String aold = """ + chrom\tpos\tval + 1\t100\taold + """; + private String anew = """ + chrom\tpos\tval + 1\t100\tanew + """; + private String bold = """ + chrom\tpos\tval + 1\t100\tbold + """; + private String bnew = """ + chrom\tpos\tval + 1\t100\tbnew + """; + private LinkFile linkFileB; + + @Before + public void setup() throws IOException { + workPath = workDir.getRoot().toPath().toAbsolutePath(); + Files.createDirectories(workPath.resolve("result_cache")); + + setupData(); + } + + @Test + public void testNoTimeTravel() { + assertEquals(anew, TestUtils.runGorPipe("gor A.gor.link", "-gorroot", workPath.toString())); + } + + @Test + public void testTimeTravelSimple() { + assertEquals(aold, TestUtils.runGorPipe("gor -time 1500000000000 A.gor.link", "-gorroot", workPath.toString())); + assertEquals(anew, TestUtils.runGorPipe("gor -time 2500000000000 A.gor.link", "-gorroot", workPath.toString())); + } + + @Test + public void testTimeTravelWithNestedQuery() { + assertEquals(anew, TestUtils.runGorPipe("gor -time 1500000000000 <(gor A.gor.link)", "-gorroot", workPath.toString())); + } + + @Test + public void testTimeTravelOnlyInNestedQuery() { + assertEquals(aold, TestUtils.runGorPipe("gor <(gor -time 1500000000000 A.gor.link)", "-gorroot", workPath.toString())); + } + + @Test + public void testTimeTravelISO() { + assertEquals(aold, TestUtils.runGorPipe("gor -time 2017-07-14T02:40:00Z A.gor.link", "-gorroot", workPath.toString())); + } + + @Test + public void testTimeTravelISOShort() { + assertEquals(aold, TestUtils.runGorPipe("gor -time 2017-07-14 A.gor.link", "-gorroot", workPath.toString())); + } + + @Test + public void testQueryIntegrityChangeWhileRunning() { + // Use the timestamp when query started. + assertEquals(anew, TestUtils.runGorPipe(""" + create before = gor A.gor.link; + create update = gor B.gor.link | join -snpsnp [before] | select 1-3 | write Alatest.gor -vlink A.gor.link; + create after = gor A.gor.link | join -snpsnp [update] | select 1-3; + gor [after] + """, "-gorroot", workPath.toString())); + + // Force use latest. + assertEquals(bnew, TestUtils.runGorPipe(String.format(""" + create before = gor A.gor.link; + create update = gor B.gor.link | join -snpsnp [before] | select 1-3 | write Alatest.gor -vlink A.gor.link; + create after = gor -time %d A.gor.link | join -snpsnp [update] | select 1-3; + gor [after] + """, Long.MAX_VALUE), "-gorroot", workPath.toString())); + } + + private void setupData() throws IOException { + Files.writeString(workPath.resolve("Aold.gor"), aold); + Files.writeString(workPath.resolve("Anew.gor"), anew); + Files.writeString(workPath.resolve("Bold.gor"), bold); + Files.writeString(workPath.resolve("Bnew.gor"), bnew); + + Files.writeString(workPath.resolve("A.gor.link"), """ + ## VERSION = 1 + #File\tTimestamp\tMD5\tSerial + Aold.gor\t1000000000000\t\t1 + Anew.gor\t1700000000000\t\t2 + """); + + Files.writeString(workPath.resolve("B.gor.link"), """ + ## VERSION = 1 + #File\tTimestamp\tMD5\tSerial + Bold.gor\t1000000000000\t\t1 + Bnew.gor\t1700000000000\t\t2 + """); + } + +} diff --git a/model/src/test/java/org/gorpipe/gor/driver/utils/LinkFileTest.java b/model/src/test/java/org/gorpipe/gor/driver/utils/LinkFileTest.java deleted file mode 100644 index 96a8e919..00000000 --- a/model/src/test/java/org/gorpipe/gor/driver/utils/LinkFileTest.java +++ /dev/null @@ -1,87 +0,0 @@ -package org.gorpipe.gor.driver.utils; - -import org.gorpipe.gor.driver.providers.stream.sources.StreamSource; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -public class LinkFileTest { - - private StreamSource mockSource; - private String linkFileContent; - - @Before - public void setUp() { - mockSource = mock(StreamSource.class); - linkFileContent = "## VERSION=1\n" + - "## ENTRIES_COUNT_MAX=100\n" + - "## ENTRIES_AGE_MAX=31536000000\n" + - "# FILE\tTIMESTAMP\tMD5\tSERIAL\n" + - "source/var/var1.gorz\t1734304890790\tABCDEAF13422\t1\n" + - "source/var/var2.gorz\t1734305124533\t334DEAF13422\t2\n"; - } - - @Test - public void testCreateLinkFile() { - LinkFile linkFile = LinkFile.create(mockSource, linkFileContent); - assertNotNull(linkFile); - assertEquals(2, linkFile.getEntries().size()); - assertEquals(100, linkFile.getEntriesCountMax()); - assertEquals(31536000000L, linkFile.getEntriesAgeMax()); - } - - @Test - public void testLoadLinkFile() throws IOException { - when(mockSource.open()).thenReturn(new ByteArrayInputStream(linkFileContent.getBytes())); - LinkFile linkFile = LinkFile.load(mockSource); - assertNotNull(linkFile); - assertEquals(2, linkFile.getEntries().size()); - assertEquals(100, linkFile.getEntriesCountMax()); - assertEquals(31536000000L, linkFile.getEntriesAgeMax()); - } - - @Test - public void testAppendEntry() { - LinkFile linkFile = LinkFile.create(mockSource, linkFileContent); - linkFile.appendEntry("source/var/var3.gorz", "NEWMD5SUM"); - assertEquals(3, linkFile.getEntries().size()); - } - - @Test - public void testGetLatestPath() { - when(mockSource.getFullPath()).thenReturn("/mnt/csa/projects/test/x.link"); - LinkFile linkFile = LinkFile.create(mockSource, linkFileContent); - assertEquals("/mnt/csa/projects/test/source/var/var2.gorz", linkFile.getLatestEntryUrl()); - linkFile.appendEntry("source/var/var3.gorz", "NEWMD5SUM"); - assertEquals("/mnt/csa/projects/test/source/var/var3.gorz", linkFile.getLatestEntryUrl()); - } - - @Test - public void testGetTimedPath() { - when(mockSource.getFullPath()).thenReturn("/mnt/csa/projects/test/x.link"); - LinkFile linkFile = LinkFile.create(mockSource, linkFileContent); - linkFile.appendEntry("source/var/var3.gorz", "NEWMD5SUM"); - - assertEquals(null, linkFile.getEntryUrl(1734304890790L - 100)); - assertEquals("/mnt/csa/projects/test/source/var/var1.gorz", linkFile.getEntryUrl(1734304890790L + 100)); - assertEquals("/mnt/csa/projects/test/source/var/var2.gorz", linkFile.getEntryUrl(1734305124533L)); - assertEquals("/mnt/csa/projects/test/source/var/var2.gorz", linkFile.getEntryUrl(1734305124533L + 100)); - assertEquals("/mnt/csa/projects/test/source/var/var3.gorz", linkFile.getEntryUrl(System.currentTimeMillis())); - } - - @Test - public void testSaveLinkFile() throws IOException { - LinkFile linkFile = LinkFile.create(mockSource, linkFileContent); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - linkFile.save(outputStream); - String savedContent = outputStream.toString(); - assertTrue(savedContent.contains("## VERSION = 1")); - assertTrue(savedContent.contains("source/var/var2.gorz\t1734305124533\t334DEAF13422\t2")); - } -} diff --git a/model/src/test/java/org/gorpipe/gor/model/UTestBaseMeta.java b/model/src/test/java/org/gorpipe/gor/model/UTestBaseMeta.java index 392f6bd8..13aa7d53 100644 --- a/model/src/test/java/org/gorpipe/gor/model/UTestBaseMeta.java +++ b/model/src/test/java/org/gorpipe/gor/model/UTestBaseMeta.java @@ -16,7 +16,7 @@ public class UTestBaseMeta { @Test public void testReadFromGorz() { BaseMeta meta = new BaseMeta(); - meta.loadAndMergeMeta(Path.of("../tests/data/gor/genes.gorz")); + meta.loadAndMergeMeta(new DriverBackedFileReader(""),"../tests/data/gor/genes.gorz"); Assert.assertEquals("Chrom\tgene_start\tgene_end\tGene_Symbol", String.join("\t", meta.getColumns())); } @@ -25,7 +25,7 @@ public void testReadFromColumnCompressedGorz() { String columnCompressedGorz = Path.of(tf.getRoot().getAbsolutePath()).resolve("genes.cc.gorz").toString(); TestUtils.runGorPipe("gor ../tests/data/gor/genes.gorz | write -c " + columnCompressedGorz); BaseMeta meta = new BaseMeta(); - meta.loadAndMergeMeta(Path.of(columnCompressedGorz)); + meta.loadAndMergeMeta(new DriverBackedFileReader(""), columnCompressedGorz); Assert.assertEquals("Chrom\tgene_start\tgene_end\tGene_Symbol", String.join("\t", meta.getColumns())); }