diff --git a/.tool-versions b/.tool-versions index adcc73e9..71c0000b 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -nodejs 20.20.2 +nodejs 22.17.1 diff --git a/LICENSE.md b/LICENSE.md index 81013701..ca7abfd2 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,6 +1,10 @@ Copyright for portions of the project are (c) 2017 ironSource Ltd. https://github.com/ironSource/parquetjs -Copyright for portions of the project are https://github.com/ZJONSSON/parquetjs -Copyright for portions of the project are https://github.com/LibertyDSNP/parquetjs + +Copyright for portions of the project are (c) https://github.com/ZJONSSON/parquetjs + +Copyright for portions of the project are (c) https://github.com/LibertyDSNP/parquetjs + +Copyright for portions of the project are (c) https://github.com/hyparam/hyparquet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the diff --git a/lib/codec/encoding.ts b/lib/codec/encoding.ts new file mode 100644 index 00000000..38e464f8 --- /dev/null +++ b/lib/codec/encoding.ts @@ -0,0 +1,210 @@ +/// +/// Modified under the MIT License from +/// https://github.com/hyparam/hyparquet +/// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import {ParquetType} from "../declare"; +import {DataReader, DecodedArray } from "./types"; + +/** + * Var int, also known as Unsigned LEB128. + * Var ints take 1 to 5 bytes (int32) or 1 to 10 bytes (int64). + * Reads groups of 7 low bits until high bit is 0. + * + * @param {DataReader} reader + * @returns {number} value + */ +export function readVarInt(reader: DataReader) { + let result = 0 + let shift = 0 + while (true) { + const byte = reader.view.getUint8(reader.offset++) + result |= (byte & 0x7f) << shift + if (!(byte & 0x80)) { + return result + } + shift += 7 + } +} + +/** + * Minimum bits needed to store value. + * + * @param {number} value + * @returns {number} + */ +export function bitWidth(value: number): number { + return 32 - Math.clz32(value) +} + +/** + * Read values from a run-length encoded/bit-packed hybrid encoding. + * + * If length is zero, then read int32 length at the start. + * + * @typedef {import("./types.d.ts").DataReader} DataReader + * @typedef {import("./types.d.ts").DecodedArray} DecodedArray + * @param {DataReader} reader + * @param {number} width - width of each bit-packed group + * @param {number} length - length of the encoded data, in bytes (?) + * @param {DecodedArray} output + * @param {disableEnvelope} - set to true to consume entire buffer, false to assume (and therefore skip) a 4 byte header + */ +export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) { + + if (!disableEnvelope) { + reader.offset += 4 + } + let seen = 0 + while (seen < output.length) { + const header = readVarInt(reader) + if (header & 1) { + // bit-packed + seen = readBitPacked(reader, header, width, output, seen) + } else { + // rle + const count = header >>> 1 + readRle(reader, count, width, output, seen) + seen += count + } + } +} + +/** + * Run-length encoding: read value with bitWidth and repeat it count times. + * + * @param {DataReader} reader + * @param {number} count + * @param {number} bitWidth + * @param {DecodedArray} output + * @param {number} seen + */ +export function readRle(reader: DataReader, + count: number, + bitWidth: number, + output: DecodedArray, + seen: number) { + const width = bitWidth + 7 >> 3 + let value = 0 + for (let i = 0; i < width; i++) { + value |= reader.view.getUint8(reader.offset++) << (i << 3) + } + // assert(value < 1 << bitWidth) + + // repeat value count times + for (let i = 0; i < count; i++) { + output[seen + i] = value + } +} + +/** + * Read a bit-packed run of the rle/bitpack hybrid. + * Supports width > 8 (crossing bytes). + * + * @param {DataReader} reader + * @param {number} header - bit-pack header + * @param {number} bitWidth + * @param {DecodedArray} output + * @param {number} seen + * @returns {number} total output values so far + */ +export function readBitPacked(reader: DataReader, + header: number, + bitWidth: number, + output: DecodedArray, + seen: number): number { + let count = header >> 1 << 3 // values to read + const mask = (1 << bitWidth) - 1 + + let data = 0 + if (reader.offset < reader.view.byteLength) { + data = reader.view.getUint8(reader.offset++) + } else if (mask) { + // sometimes out-of-bounds reads are masked out + throw new Error(`parquet bitpack offset ${reader.offset} out of range`) + } + let left = 8 + let right = 0 + + // read values + while (count) { + // if we have crossed a byte boundary, shift the data + if (right > 8) { + right -= 8 + left -= 8 + data >>>= 8 + } else if (left - right < bitWidth) { + // if we don't have bitWidth number of bits to read, read next byte + data |= reader.view.getUint8(reader.offset) << left + reader.offset++ + left += 8 + } else { + if (seen < output.length) { + // emit value + output[seen++] = data >> right & mask + } + count-- + right += bitWidth + } + } + + return seen +} + +/** + * @typedef {import("./types.d.ts").ParquetType} ParquetType + * @param {DataReader} reader + * @param {number} count + * @param {ParquetType} type + * @param {number | undefined} typeLength + * @returns {DecodedArray} + */ +export function byteStreamSplit(reader: DataReader, count: number, type: ParquetType, typeLength: number|undefined) { + const width = byteWidth(type, typeLength) + const bytes = new Uint8Array(count * width) + for (let b = 0; b < width; b++) { + for (let i = 0; i < count; i++) { + bytes[i * width + b] = reader.view.getUint8(reader.offset++) + } + } + // interpret bytes as typed array + if (type === 'FLOAT') return new Float32Array(bytes.buffer) + else if (type === 'DOUBLE') return new Float64Array(bytes.buffer) + else if (type === 'INT32') return new Int32Array(bytes.buffer) + else if (type === 'INT64') return new BigInt64Array(bytes.buffer) + else if (type === 'FIXED_LEN_BYTE_ARRAY') { + // split into arrays of typeLength + const split = new Array(count) + for (let i = 0; i < count; i++) { + split[i] = bytes.subarray(i * width, (i + 1) * width) + } + return split + } + throw new Error(`parquet byte_stream_split unsupported type: ${type}`) +} + +/** + * @param {ParquetType} type + * @param {number | undefined} typeLength + * @returns {number} + */ +function byteWidth(type: ParquetType, typeLength: number|undefined): number { + switch (type) { + case 'INT32': + case 'FLOAT': + return 4 + case 'INT64': + case 'DOUBLE': + return 8 + case 'FIXED_LEN_BYTE_ARRAY': + if (!typeLength) throw new Error('parquet byteWidth missing type_length') + return typeLength + default: + throw new Error(`parquet unsupported type: ${type}`) + } +} diff --git a/lib/codec/plain_dictionary.ts b/lib/codec/plain_dictionary.ts index 916730b3..34c521d9 100644 --- a/lib/codec/plain_dictionary.ts +++ b/lib/codec/plain_dictionary.ts @@ -1,8 +1,18 @@ import * as rle from './rle'; -import { Cursor, Options } from './types'; +import {Cursor, DataReader, DecodedArray, Options} from './types' +import {readRleBitPackedHybrid} from "./encoding"; export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) { const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0); cursor.offset += 1; - return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth })); + // old: + // return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth })); + const reader: DataReader = { + view: new DataView(cursor.buffer.buffer, cursor.offset), + offset: 0, + } + const output: DecodedArray = new Array(count); + const disableEnvelope = true; + readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope) + return output; }; diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index 32a43e18..a759f1c4 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -3,7 +3,8 @@ // https://github.com/apache/parquet-format/blob/master/Encodings.md import varint from 'varint'; -import { Cursor } from './types'; +import {Cursor} from './types'; +import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding"; function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) { for (let i = 0; i < values.length % 8; i++) { @@ -44,9 +45,11 @@ function unknownToParsedInt(value: string | number) { export const encodeValues = function ( type: string, - values: number[], - opts: { bitWidth: number; disableEnvelope?: boolean } -) { + values: Array, + opts: { + bitWidth: number, + disableEnvelope?: boolean + }) { if (!('bitWidth' in opts)) { throw new Error('bitWidth is required'); } @@ -105,23 +108,67 @@ export const encodeValues = function ( return envelope; }; -function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }) { - if (count % 8 !== 0) { - throw new Error('must be a multiple of 8'); - } - - const values = new Array(count).fill(0); - for (let b = 0; b < opts.bitWidth * count; ++b) { - if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) { - values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth); - } - } - - cursor.offset += opts.bitWidth * (count / 8); - return values; +// opts.bitWidth is undefined when the boolean values are being passed +// decode a bitpacked value +// setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad. +// cursor: Cursor containing the data to be decoded +// count: the number of values expected to result from the decoding +// opts: bitWidth is required. +// returns: a DecodedArray +export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { + const output = new Array(count).fill(0); + const bytesNeeded = Math.ceil((opts.bitWidth * count) / 8); + + // const values = new Array(count).fill(0); + // for (let b = 0; b < opts.bitWidth * count; ++b) { + // if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) { + // values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth); + // } + // } + // + // cursor.offset += opts.bitWidth * (count / 8); + // } else { + // const view = new DataView(cursor.buffer.buffer, cursor.offset); + // const reader = {view, offset: 0} + // const header = readVarInt(reader); + // readBitPacked(reader, header, opts.bitWidth, output, 0) + // } + // return output; + // } + + // IMPORTANT: Create DataView with proper offset handling + // Buffer.buffer might have an internal byteOffset we need to account for + const view = new DataView( + cursor.buffer.buffer, + cursor.buffer.byteOffset + cursor.offset, + bytesNeeded + ); + const reader = {view, offset: 0} + // DON'T read a header - we're already past it! + // The header was already consumed by decodeValues + // We just need to decode the bit-packed data directly + + // Create a fake header for the bit-packed run + // count is already the number of values (multiple of 8) + const header = ((count / 8) << 1) | 1; // bit-packed header + const seen = readBitPacked(reader, header, opts.bitWidth, output, 0); + // Update cursor position + cursor.offset += bytesNeeded; + + return output; } -function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }) { +// decode an RLE value +// Note that the RLE encoding method is only supported for the following types of data: +// +// Repetition and definition levels +// Dictionary indices +// Boolean values in data pages, as an alternative to PLAIN encoding +// See https://parquet.apache.org/docs/file-format/data-pages/encodings/ +// setting this to run old code lets the RLE/bitpacked hybrid documentation example still pass. +// So maybe this code is fine. +export function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { + let output = new Array(count).fill(0); const bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); let value = 0; @@ -133,26 +180,26 @@ function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: numb cursor.offset += 1; } - return new Array(count).fill(value); + output = new Array(count).fill(value); + return output; } -export const decodeValues = function ( - _: string, - cursor: Cursor, - count: number, - opts: { bitWidth: number; disableEnvelope?: boolean } -) { +// cursor: contains buffer + offset for data +// count: the number of items expected to decode +// opts: must include bitWidth, disableEnvelope is optional, specify true to use all bytes, false to skip first four +// bytes +export const decodeValues = function (_: string, cursor: Cursor, count: number, opts: { + bitWidth: number, + disableEnvelope?: boolean +}) { if (!('bitWidth' in opts)) { throw new Error('bitWidth is required'); } - + let values = []; + let res; if (!opts.disableEnvelope) { cursor.offset += 4; } - - let values = []; - let res; - while (values.length < count) { const header = varint.decode(cursor.buffer, cursor.offset); cursor.offset += varint.encodingLength(header); diff --git a/lib/codec/types.ts b/lib/codec/types.ts index d83438b2..32e0d02f 100644 --- a/lib/codec/types.ts +++ b/lib/codec/types.ts @@ -30,3 +30,18 @@ export interface Cursor { offset: number; size?: number; } + +export interface DataReader { + view: DataView + offset: number +} + +export type DecodedArray = + Uint8Array | + Int32Array | + BigInt64Array | + BigUint64Array | + Float32Array | + Float64Array | + any[] + diff --git a/lib/datapageV2.ts b/lib/datapageV2.ts new file mode 100644 index 00000000..0cbb8872 --- /dev/null +++ b/lib/datapageV2.ts @@ -0,0 +1,55 @@ +/// +/// Modified under the MIT License from +/// https://github.com/hyparam/hyparquet +/// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import {DataReader} from "./codec/types"; +import {DataPageHeaderV2} from "../gen-nodejs/parquet_types"; +import {readRleBitPackedHybrid} from "./codec/encoding"; +import {getBitWidth} from "./util"; + +/** + * @typedef {import("./types.d.ts").DataReader} DataReader + * @param {DataReader} reader + * @param {DataPageHeaderV2} daph2 data page header v2 + * @param {number} rLevelMax the maximum of repetition levels + * @returns {any[]} repetition levels + */ +export const readRepetitionLevelsV2 = (reader: DataReader, + daph2: DataPageHeaderV2, + rLevelMax: number, +): Array => { + const values = new Array(daph2.num_values) + if (!rLevelMax) return values.fill(0); + const bitWidth = getBitWidth(rLevelMax) + let disableEnvelope = daph2.definition_levels_byte_length === 0 + readRleBitPackedHybrid(reader, bitWidth, daph2.repetition_levels_byte_length, values, disableEnvelope) + return values +} + +/** + * @typedef {import("./types.d.ts").DataReader} DataReader + * @param {DataReader} reader + * @param {DataPageHeaderV2} daph2 data page header v2 + * @param {number} dLevelMax the maximum of definition levels + * @returns {number[] | undefined} definition levels + */ +export const readDefinitionLevelsV2 = (reader: DataReader, + daph2: DataPageHeaderV2, + dLevelMax: number, +): Array|undefined => { + if (dLevelMax) { + // V2 we know the length + const values = new Array(daph2.num_values) + const bitWidth = getBitWidth(dLevelMax) + const disableEnvelope = true + readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values, disableEnvelope) + return values + } +} diff --git a/lib/reader.ts b/lib/reader.ts index 8833538b..5052eeeb 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -27,6 +27,8 @@ import { Cursor, Options } from './codec/types'; import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3'; import type { Readable } from 'stream'; import type { Blob } from 'buffer'; +import {readDefinitionLevelsV2, readRepetitionLevelsV2} from "./datapageV2"; +import {dataReaderFromCursor} from "./util"; const { getBloomFiltersFor } = bloomFilterReader; @@ -143,7 +145,7 @@ export class ParquetReader { /** * Open the parquet file from S3 using the supplied aws client [, commands] and params * The params have to include `Bucket` and `Key` to the file requested, - * If using v3 of the AWS SDK, combine the client and commands into an object wiht keys matching + * If using v3 of the AWS SDK, combine the client and commands into an object with keys matching * the original module names, and do not instantiate the commands; pass them as classes/modules. * * This function returns a new parquet reader [ or throws an Error.] @@ -161,7 +163,7 @@ export class ParquetReader { } /** - * Open the parquet file from a url using the supplied request module + * Open the parquet file from a URL using the supplied request module * params should either be a string (url) or an object that includes * a `url` property. * This function returns a new parquet reader @@ -548,9 +550,7 @@ export class ParquetEnvelopeReader { const headers = Object.assign({}, defaultHeaders, { range }); const response = await fetch(url, { headers }); const arrayBuffer = await response.arrayBuffer(); - const buffer = Buffer.from(arrayBuffer); - - return buffer; + return Buffer.from(arrayBuffer); }; const closeFn = () => ({}); @@ -739,7 +739,6 @@ export class ParquetEnvelopeReader { buffer.columnData![colKey.join(',')] = await this.readColumnChunk(schema, colChunk); } - return buffer; } @@ -944,16 +943,20 @@ async function decodePages(buffer: Buffer, opts: Options) { pageData.values = pageData.values!.map((d) => opts.dictionary![d]); } - const length = pageData.rlevels != undefined ? pageData.rlevels.length : 0; + const length = pageData.rlevels !== undefined ? pageData.rlevels?.length : 0; - for (let i = 0; i < length; i++) { - data.rlevels!.push(pageData.rlevels![i]); - data.dlevels!.push(pageData.dlevels![i]); - const value = pageData.values![i]; - if (value !== undefined) { - data.values!.push(value); - } + if (pageData.rlevels?.length) { + data.rlevels = pageData.rlevels; + } + if (pageData.dlevels?.length) { + data.dlevels = pageData.dlevels; } + if (length > 0) { + pageData.values?.forEach(val => { + if (val !== undefined) data.values!.push(val) + }) + } + data.count! += pageData.count!; data.pageHeaders!.push(pageData.pageHeader!); } @@ -994,7 +997,7 @@ async function decodeDictionaryPage(cursor: Cursor, header: parquet_thrift.PageH ); } -async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options) { +async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeader = header.data_page_header!; @@ -1070,7 +1073,7 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, }; } -async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options) { +async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeaderV2 = header.data_page_header_v2!; @@ -1079,26 +1082,17 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding); /* read repetition levels */ - let rLevels = new Array(valueCount); - if (opts.rLevelMax! > 0) { - rLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { - bitWidth: parquet_util.getBitWidth(opts.rLevelMax!), - disableEnvelope: true, - }); - } else { - rLevels.fill(0); - } + let rLevels: Array; + let reader = dataReaderFromCursor(cursor, 0) + + rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0); + reader.offset = dataPageHeaderV2.repetition_levels_byte_length; /* read definition levels */ - let dLevels = new Array(valueCount); - if (opts.dLevelMax! > 0) { - dLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { - bitWidth: parquet_util.getBitWidth(opts.dLevelMax!), - disableEnvelope: true, - }); - } else { - dLevels.fill(0); - } + let dLevels: Array|undefined; + dLevels = readDefinitionLevelsV2(reader, dataPageHeaderV2, opts.dLevelMax || 0) + cursor.offset += reader.offset; + /* read values */ let valuesBufCursor = cursor; @@ -1106,8 +1100,7 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade if (dataPageHeaderV2.is_compressed) { const valuesBuf = await parquet_compression.inflate( opts.compression!, - cursor.buffer.subarray(cursor.offset, cursorEnd) - ); + cursor.buffer.subarray(cursor.offset, cursorEnd)); valuesBufCursor = { buffer: valuesBuf, @@ -1118,7 +1111,12 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade cursor.offset = cursorEnd; } - const values = decodeValues(opts.type!, valueEncoding as ParquetCodec, valuesBufCursor, valueCountNonNull, { + const values = decodeValues( + opts.type!, + valueEncoding as ParquetCodec, + valuesBufCursor, + valueCountNonNull, + { bitWidth: opts.column!.typeLength!, treatInt96AsTimestamp: opts.treatInt96AsTimestamp, ...opts.column!, diff --git a/lib/types.ts b/lib/types.ts index c76dcd69..8cdfd507 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -117,6 +117,7 @@ const PARQUET_LOGICAL_TYPES = new Set([ const PARQUET_LOGICAL_TYPE_DATA: Record = { BOOLEAN: { primitiveType: 'BOOLEAN', + typeLength: 1, toPrimitive: toPrimitive_BOOLEAN, fromPrimitive: fromPrimitive_BOOLEAN, }, diff --git a/lib/util.ts b/lib/util.ts index bfd65e24..f7ade1e1 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -4,6 +4,7 @@ import fs, { WriteStream } from 'fs'; import * as parquet_thrift from '../gen-nodejs/parquet_types'; import { FileMetaDataExt, WriterOptions } from './declare'; import { Int64 } from 'thrift'; +import {Cursor, DataReader} from "./codec/types"; // Use this so users only need to implement the minimal amount of the WriteStream interface export type WriteStreamMinimal = Pick; @@ -111,7 +112,9 @@ export const getBitWidth = function (val: number) { */ export const getThriftEnum = function (klass: Enums, value: unknown) { for (const k in klass) { + console.log(`Does ${klass[k]} ${value}?`) if (klass[k] === value) { + console.log("FOUND!") return k; } } @@ -232,3 +235,9 @@ export const fieldIndexOf = function (arr: unknown[][], elem: unknown[]) { export const cloneInteger = (int: Int64) => { return new Int64(int.valueOf()); }; + + +export function dataReaderFromCursor(data: Cursor, offset?: number): DataReader { + let view = new DataView(data.buffer.buffer, data.offset); + return { view , offset: offset || 0}; +} diff --git a/test/codec_rle.js b/test/codec_rle.js index 7b2528ad..5c7cb770 100644 --- a/test/codec_rle.js +++ b/test/codec_rle.js @@ -2,6 +2,14 @@ const chai = require('chai'); const assert = chai.assert; const parquet_codec_rle = require('../lib/codec/rle'); +const {readRleBitPackedHybrid} = require("../lib/codec/encoding"); + +function dataViewFromArray(data) { + const ab = new ArrayBuffer(data.length,{ maxByteLength: data.length }); + let view = new DataView(ab, 0); + data.forEach((val,idx) => view.setUint8(idx, val)); + return view +} describe('ParquetCodec::RLE', function () { it('should encode bitpacked values', function () { @@ -72,20 +80,30 @@ describe('ParquetCodec::RLE', function () { }); it('should decode repeated values', function () { + const data = [0x10, 0x87, 0xd6, 0x12]; + let cursor = { + buffer: Buffer.from(data), + offset: 0, + }; + const expectedDecoded = [1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567] + const bitWidth = 21; + const disableEnvelope = true; let vals = parquet_codec_rle.decodeValues( - 'INT32', - { - buffer: Buffer.from([0x10, 0x87, 0xd6, 0x12]), - offset: 0, - }, - 8, - { - disableEnvelope: true, - bitWidth: 21, - } + 'UNUSED', + cursor, + expectedDecoded.length, + { disableEnvelope, bitWidth} ); - assert.deepEqual(vals, [1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567]); + assert.deepEqual(vals, expectedDecoded); + + let view = dataViewFromArray(data); + const reader = { view , offset: 0 }; + let output = new Array(expectedDecoded.length); + readRleBitPackedHybrid(reader, bitWidth, data.length, output, disableEnvelope) + + assert.deepEqual(output, expectedDecoded); + }); it('should encode mixed runs', function () { @@ -102,19 +120,28 @@ describe('ParquetCodec::RLE', function () { }); it('should decode mixed runs', function () { + const expectedDecoded = [0, 1, 2, 3, 4, 5, 6, 7, 4, 4, 4, 4, 4, 4, 4, 4, 0, 1, 2, 3, 4, 5, 6, 7]; + const data = [0x03, 0x88, 0xc6, 0xfa, 0x10, 0x04, 0x03, 0x88, 0xc6, 0xfa]; + const disableEnvelope = true; + const bitWidth = 3; + let vals = parquet_codec_rle.decodeValues( - 'INT32', + 'UNUSED', { - buffer: Buffer.from([0x03, 0x88, 0xc6, 0xfa, 0x10, 0x04, 0x03, 0x88, 0xc6, 0xfa]), + buffer: Buffer.from(data), offset: 0, }, - 24, - { - disableEnvelope: true, - bitWidth: 3, - } - ); + expectedDecoded.length, + { disableEnvelope, bitWidth }); + + assert.deepEqual(vals,expectedDecoded); + + let view = dataViewFromArray(data); + const reader = { view , offset: 0 }; + let output = new Array(expectedDecoded.length); + readRleBitPackedHybrid(reader, bitWidth, data.length, output, disableEnvelope) + + assert.deepEqual(output, expectedDecoded); - assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 4, 4, 4, 4, 4, 4, 4, 4, 0, 1, 2, 3, 4, 5, 6, 7]); }); }); diff --git a/test/lib/codec/rle.test.ts b/test/lib/codec/rle.test.ts new file mode 100644 index 00000000..fa83b4cd --- /dev/null +++ b/test/lib/codec/rle.test.ts @@ -0,0 +1,69 @@ +import {expect} from 'chai'; +import {decodeRunBitpacked} from '../../../lib/codec/rle'; +import {readRleBitPackedHybrid} from "../../../lib/codec/encoding"; + +describe('RLE Codec', function () { + it('can decode a known bitpack value', function () { + // 136 = 10001000, 0x80 = 128 or 100000000 + const bitPackedDecBuffer = Buffer.from([136, 1, 7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]); + + const myView = new DataView(bitPackedDecBuffer.buffer); + let decoder = new TextDecoder('utf-8'); + const res = bitPackedDecBuffer.map((val, i, ary) => myView.getUint8(i)); + // Answer was `var N1 = Mat` which is part of the compiled parquetjs library. + // Undid some changes and now it's `module.exports = read`... + console.log(decoder.decode(res)); + + const reader = {view: myView, offset: 0}; + const values = new Array(26).fill(0); + const expected = [ + 1, 1, 1, 1, 0, 1, 1, 0, + 0, 1, 1, 1, 0, 1, 1, 0, + 1, 1, 0, 0, 1, 1, 1, 0, + 0, 0 + ]; + readRleBitPackedHybrid(reader, 1, 13, values, true); + expect(values.length).equals(expected.length) + // correct? + values.forEach((val, i) => { + expect(val, `${val} != ${expected[i]} for i = ${i}`).equals(expected[i]) + }) + }); + + describe('#decodeRunBitpacked', function () { + // use the example from the documentation for RLE/Bitpacked hybrid, + // https://parquet.apache.org/docs/file-format/data-pages/encodings/#RLE + it('writes and reads bit packed values for documentation example correctly', () => { + const decVals = [0, 1, 2, 3, 4, 5, 6, 7] + + const bytesOfEncodedData = 4 + const buffer = new ArrayBuffer(bytesOfEncodedData) + + const view = new DataView(buffer); + + // the number of values? or the number of bits used for encoding a set of values ? + const bitPackedRunLength = 8; + const bitPackedScaledRunLength = bitPackedRunLength / 8; + // in the grammar it says it's EITHER the left-shifted value OR 1 if the shifted value is 0?? but that makes + // no sense and results in an error. + const shiftedBPSRL = (bitPackedScaledRunLength << 1) | 1; + + view.setUint8(0, shiftedBPSRL); + + // byte values 1-3, from the example + view.setUint8(1, 0b10001000); + view.setUint8(2, 0b11000110); + view.setUint8(3, 0b11111010); + + const bitWidth = 3; + + // number of expected values in the result = 8 + const cursor = {buffer: Buffer.from(view.buffer), offset: 0} + const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth}); + expect(values.length).equals(decVals.length); + values.forEach((val, i) => { + expect(val).equals(decVals[i]); + }) + }); + }); +}); diff --git a/test/test-files.js b/test/test-files.js index 15da0578..1be377b5 100644 --- a/test/test-files.js +++ b/test/test-files.js @@ -81,7 +81,7 @@ describe('test-files', function () { }); // repeated values - it.skip('nation.dict.parquet loads', async function () { + it('nation.dict.parquet loads', async function() { await check('nation.dict.parquet', ['nation_key', 'name', 'region_key', 'comment_col']); }); @@ -227,7 +227,7 @@ describe('test-files', function () { describe('RLE', function () { // Tracked in https://github.com/LibertyDSNP/parquetjs/issues/113 - it.skip('rle_boolean_encoding.parquet loads', async function () { + it('rle_boolean_encoding.parquet loads', async function () { const data = await readData('rle/rle_boolean_encoding.parquet'); assert.deepEqual(data[0], { datatype_boolean: true }); assert.deepEqual(data[1], { datatype_boolean: false });