From bc2ca9a49af7e618ab3415206312c71aa2a6c3ac Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 21 Aug 2024 14:26:40 -0700 Subject: [PATCH 1/6] WIP debugging RLE --- lib/codec/rle.ts | 8 +++++++- lib/reader.ts | 8 +++++--- lib/types.ts | 1 + 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index 32a43e18..abf991f6 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -109,6 +109,9 @@ function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: num if (count % 8 !== 0) { throw new Error('must be a multiple of 8'); } + if (!opts.bitWidth) { + throw new Error("bitWidth must be set") + } const values = new Array(count).fill(0); for (let b = 0; b < opts.bitWidth * count; ++b) { @@ -146,6 +149,7 @@ export const decodeValues = function ( throw new Error('bitWidth is required'); } + // disableEnvelope is undefined and so is bitWidth if (!opts.disableEnvelope) { cursor.offset += 4; } @@ -153,6 +157,9 @@ export const decodeValues = function ( let values = []; let res; + // Buffer index 23 = 136, this is an array of bytes, why is it "decoding" the value and adding an offset?? + // it was decoding to 7 and taking the cursor offset out decodes to 136, + // but I don't know that this is even correctly decoding. while (values.length < count) { const header = varint.decode(cursor.buffer, cursor.offset); cursor.offset += varint.encodingLength(header); @@ -171,6 +178,5 @@ export const decodeValues = function ( if (values.length !== count) { throw new Error('invalid RLE encoding'); } - return values; }; diff --git a/lib/reader.ts b/lib/reader.ts index 8833538b..aca08e6b 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -95,6 +95,7 @@ class ParquetCursor { this.columnList ); + // now this one is *@($&ing up, it's dematerializing records this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer); this.rowGroupIndex++; this.cursorIndex = 0; @@ -739,7 +740,7 @@ export class ParquetEnvelopeReader { buffer.columnData![colKey.join(',')] = await this.readColumnChunk(schema, colChunk); } - + console.log("buffer columnData outside forEach: ", buffer.columnData); return buffer; } @@ -994,7 +995,7 @@ async function decodeDictionaryPage(cursor: Cursor, header: parquet_thrift.PageH ); } -async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options) { +async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeader = header.data_page_header!; @@ -1070,7 +1071,7 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, }; } -async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options) { +async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeaderV2 = header.data_page_header_v2!; @@ -1109,6 +1110,7 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade cursor.buffer.subarray(cursor.offset, cursorEnd) ); + // with the correction, the inflated valuesBufCursor matches the rust version by this point. valuesBufCursor = { buffer: valuesBuf, offset: 0, diff --git a/lib/types.ts b/lib/types.ts index c76dcd69..8cdfd507 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -117,6 +117,7 @@ const PARQUET_LOGICAL_TYPES = new Set([ const PARQUET_LOGICAL_TYPE_DATA: Record = { BOOLEAN: { primitiveType: 'BOOLEAN', + typeLength: 1, toPrimitive: toPrimitive_BOOLEAN, fromPrimitive: fromPrimitive_BOOLEAN, }, From ab6a46c037bf556c6ad34087bfababe9f312b797 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 21 Aug 2024 14:42:16 -0700 Subject: [PATCH 2/6] correct the check for bitWidth --- lib/codec/rle.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index abf991f6..8a40f443 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -109,9 +109,6 @@ function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: num if (count % 8 !== 0) { throw new Error('must be a multiple of 8'); } - if (!opts.bitWidth) { - throw new Error("bitWidth must be set") - } const values = new Array(count).fill(0); for (let b = 0; b < opts.bitWidth * count; ++b) { From ec68048b315db3618bce1e44c30a720cce54926f Mon Sep 17 00:00:00 2001 From: shannonwells Date: Thu, 29 Aug 2024 16:10:11 -0700 Subject: [PATCH 3/6] Working up to a point; we are going off the end of the DataView --- LICENSE.md | 8 +- lib/codec/encoding.ts | 210 +++++++++++++++++++++++++++++++++++++ lib/codec/rle.ts | 67 +++++++++--- lib/codec/types.ts | 15 +++ lib/datapageV2.ts | 55 ++++++++++ lib/reader.ts | 47 +++++++-- test/lib/codec/rle.test.ts | 53 ++++++++++ test/test-files.js | 2 +- 8 files changed, 426 insertions(+), 31 deletions(-) create mode 100644 lib/codec/encoding.ts create mode 100644 lib/datapageV2.ts create mode 100644 test/lib/codec/rle.test.ts diff --git a/LICENSE.md b/LICENSE.md index 81013701..ca7abfd2 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,6 +1,10 @@ Copyright for portions of the project are (c) 2017 ironSource Ltd. https://github.com/ironSource/parquetjs -Copyright for portions of the project are https://github.com/ZJONSSON/parquetjs -Copyright for portions of the project are https://github.com/LibertyDSNP/parquetjs + +Copyright for portions of the project are (c) https://github.com/ZJONSSON/parquetjs + +Copyright for portions of the project are (c) https://github.com/LibertyDSNP/parquetjs + +Copyright for portions of the project are (c) https://github.com/hyparam/hyparquet Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the diff --git a/lib/codec/encoding.ts b/lib/codec/encoding.ts new file mode 100644 index 00000000..444a273f --- /dev/null +++ b/lib/codec/encoding.ts @@ -0,0 +1,210 @@ +/// +/// Modified under the MIT License from +/// https://github.com/hyparam/hyparquet +/// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import {ParquetType} from "../declare"; +import {DataReader, DecodedArray } from "./types"; + +/** + * Var int, also known as Unsigned LEB128. + * Var ints take 1 to 5 bytes (int32) or 1 to 10 bytes (int64). + * Reads groups of 7 low bits until high bit is 0. + * + * @param {DataReader} reader + * @returns {number} value + */ +export function readVarInt(reader: DataReader) { + let result = 0 + let shift = 0 + while (true) { + const byte = reader.view.getUint8(reader.offset++) + result |= (byte & 0x7f) << shift + if (!(byte & 0x80)) { + return result + } + shift += 7 + } +} + +/** + * Minimum bits needed to store value. + * + * @param {number} value + * @returns {number} + */ +export function bitWidth(value: number): number { + return 32 - Math.clz32(value) +} + +/** + * Read values from a run-length encoded/bit-packed hybrid encoding. + * + * If length is zero, then read int32 length at the start. + * + * @typedef {import("./types.d.ts").DataReader} DataReader + * @typedef {import("./types.d.ts").DecodedArray} DecodedArray + * @param {DataReader} reader + * @param {number} width - width of each bit-packed group + * @param {number} length - length of the encoded data, in bytes (?) + * @param {DecodedArray} output + */ +export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray) { + if (!length) { + // length = reader.view.getUint32(reader.offset, true) + reader.offset += 4 + } + let seen = 0 + while (seen < output.length) { + const header = readVarInt(reader) + if (header & 1) { + // bit-packed + seen = readBitPacked(reader, header, width, output, seen) + } else { + // rle + const count = header >>> 1 + readRle(reader, count, width, output, seen) + seen += count + } + } + // assert(reader.offset - startOffset === length) +} + +/** + * Run-length encoding: read value with bitWidth and repeat it count times. + * + * @param {DataReader} reader + * @param {number} count + * @param {number} bitWidth + * @param {DecodedArray} output + * @param {number} seen + */ +export function readRle(reader: DataReader, + count: number, + bitWidth: number, + output: DecodedArray, + seen: number) { + const width = bitWidth + 7 >> 3 + let value = 0 + for (let i = 0; i < width; i++) { + value |= reader.view.getUint8(reader.offset++) << (i << 3) + } + // assert(value < 1 << bitWidth) + + // repeat value count times + for (let i = 0; i < count; i++) { + output[seen + i] = value + } +} + +/** + * Read a bit-packed run of the rle/bitpack hybrid. + * Supports width > 8 (crossing bytes). + * + * @param {DataReader} reader + * @param {number} header - bit-pack header + * @param {number} bitWidth + * @param {DecodedArray} output + * @param {number} seen + * @returns {number} total output values so far + */ +export function readBitPacked(reader: DataReader, + header: number, + bitWidth: number, + output: DecodedArray, + seen: number): number { + let count = header >> 1 << 3 // values to read + const mask = (1 << bitWidth) - 1 + + let data = 0 + if (reader.offset < reader.view.byteLength) { + data = reader.view.getUint8(reader.offset++) + } else if (mask) { + // sometimes out-of-bounds reads are masked out + throw new Error(`parquet bitpack offset ${reader.offset} out of range`) + } + let left = 8 + let right = 0 + + // read values + while (count) { + // if we have crossed a byte boundary, shift the data + if (right > 8) { + right -= 8 + left -= 8 + data >>>= 8 + } else if (left - right < bitWidth) { + // if we don't have bitWidth number of bits to read, read next byte + data |= reader.view.getUint8(reader.offset) << left + reader.offset++ + left += 8 + } else { + if (seen < output.length) { + // emit value + output[seen++] = data >> right & mask + } + count-- + right += bitWidth + } + } + + return seen +} + +/** + * @typedef {import("./types.d.ts").ParquetType} ParquetType + * @param {DataReader} reader + * @param {number} count + * @param {ParquetType} type + * @param {number | undefined} typeLength + * @returns {DecodedArray} + */ +export function byteStreamSplit(reader: DataReader, count: number, type: ParquetType, typeLength: number|undefined) { + const width = byteWidth(type, typeLength) + const bytes = new Uint8Array(count * width) + for (let b = 0; b < width; b++) { + for (let i = 0; i < count; i++) { + bytes[i * width + b] = reader.view.getUint8(reader.offset++) + } + } + // interpret bytes as typed array + if (type === 'FLOAT') return new Float32Array(bytes.buffer) + else if (type === 'DOUBLE') return new Float64Array(bytes.buffer) + else if (type === 'INT32') return new Int32Array(bytes.buffer) + else if (type === 'INT64') return new BigInt64Array(bytes.buffer) + else if (type === 'FIXED_LEN_BYTE_ARRAY') { + // split into arrays of typeLength + const split = new Array(count) + for (let i = 0; i < count; i++) { + split[i] = bytes.subarray(i * width, (i + 1) * width) + } + return split + } + throw new Error(`parquet byte_stream_split unsupported type: ${type}`) +} + +/** + * @param {ParquetType} type + * @param {number | undefined} typeLength + * @returns {number} + */ +function byteWidth(type: ParquetType, typeLength: number|undefined): number { + switch (type) { + case 'INT32': + case 'FLOAT': + return 4 + case 'INT64': + case 'DOUBLE': + return 8 + case 'FIXED_LEN_BYTE_ARRAY': + if (!typeLength) throw new Error('parquet byteWidth missing type_length') + return typeLength + default: + throw new Error(`parquet unsupported type: ${type}`) + } +} diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index 8a40f443..c6f08290 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -4,6 +4,7 @@ import varint from 'varint'; import { Cursor } from './types'; +import {readBitPacked, readRle, readVarInt} from "./encoding"; function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) { for (let i = 0; i < values.length % 8; i++) { @@ -105,10 +106,16 @@ export const encodeValues = function ( return envelope; }; -function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }) { - if (count % 8 !== 0) { - throw new Error('must be a multiple of 8'); - } +// opts.bitWidth is undefined when the boolean values are being passed +// decode a bitpacked value +// setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad. +export function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWidth: number }): Array { + const run_old_code = false; + let output = new Array(count).fill(0); + if (run_old_code) { + if (count % 8 !== 0) { + throw 'must be a multiple of 8'; + } const values = new Array(count).fill(0); for (let b = 0; b < opts.bitWidth * count; ++b) { @@ -117,23 +124,49 @@ function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: num } } - cursor.offset += opts.bitWidth * (count / 8); - return values; + cursor.offset += opts.bitWidth * (count / 8); + } else { + const view = new DataView(cursor.buffer.buffer); + const reader = { view, offset: cursor.offset } + const header = readVarInt(reader); + readBitPacked(reader, header, opts.bitWidth, output, 0) + } + console.log({output}) + return output; } -function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }) { - const bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); - let value = 0; - - for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) { +// decode an RLE value +// Note that the RLE encoding method is only supported for the following types of data: +// +// Repetition and definition levels +// Dictionary indices +// Boolean values in data pages, as an alternative to PLAIN encoding +// See https://parquet.apache.org/docs/file-format/data-pages/encodings/ +// setting this to run old code lets the RLE/bitpacked hybrid documentation example still pass. +// So maybe this code is fine. +export function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { + const run_old_code = true; + let output = new Array(count).fill(0); + if (run_old_code) { + var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); + let value = 0; + + for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) { const byte = cursor.buffer[cursor.offset]; - // Bytes are stored LSB to MSB, so we need to shift - // each new byte appropriately. - value += byte << (i * 8); - cursor.offset += 1; - } + // Bytes are stored LSB to MSB, so we need to shift + // each new byte appropriately. + value += byte << (i * 8); + cursor.offset += 1; + } - return new Array(count).fill(value); + output = new Array(count).fill(value); + } else { + const view = new DataView(cursor.buffer.buffer); + const reader = { view, offset: cursor.offset}; + readRle(reader, count, opts.bitWidth, output, 0); + } + console.log({output}); + return output; } export const decodeValues = function ( diff --git a/lib/codec/types.ts b/lib/codec/types.ts index d83438b2..32e0d02f 100644 --- a/lib/codec/types.ts +++ b/lib/codec/types.ts @@ -30,3 +30,18 @@ export interface Cursor { offset: number; size?: number; } + +export interface DataReader { + view: DataView + offset: number +} + +export type DecodedArray = + Uint8Array | + Int32Array | + BigInt64Array | + BigUint64Array | + Float32Array | + Float64Array | + any[] + diff --git a/lib/datapageV2.ts b/lib/datapageV2.ts new file mode 100644 index 00000000..04e57e6f --- /dev/null +++ b/lib/datapageV2.ts @@ -0,0 +1,55 @@ +/// +/// Modified under the MIT License from +/// https://github.com/hyparam/hyparquet +/// The MIT License (MIT) +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import {DataReader} from "./codec/types"; +import {DataPageHeaderV2} from "../gen-nodejs/parquet_types"; +import {readRleBitPackedHybrid} from "./codec/encoding"; +import {getBitWidth} from "./util"; + +/** + * @typedef {import("./types.d.ts").DataReader} DataReader + * @param {DataReader} reader + * @param {DataPageHeaderV2} daph2 data page header v2 + * @param {number} rLevelMax the maximum of repetition levels + * @returns {any[]} repetition levels + */ +export const readRepetitionLevelsV2 = (reader: DataReader, + daph2: DataPageHeaderV2, + rLevelMax: number, +): Array => { + if (!rLevelMax) return [] + const values = new Array(daph2.num_values) + const bitWidth = getBitWidth(rLevelMax) + readRleBitPackedHybrid( + reader, bitWidth, daph2.repetition_levels_byte_length, values + ) + return values +} + +/** + * @typedef {import("./types.d.ts").DataReader} DataReader + * @param {DataReader} reader + * @param {DataPageHeaderV2} daph2 data page header v2 + * @param {number} dLevelMax the maximum of definition levels + * @returns {number[] | undefined} definition levels + */ +export const readDefinitionLevelsV2 = (reader: DataReader, + daph2: DataPageHeaderV2, + dLevelMax: number, +): Array|undefined => { + if (dLevelMax) { + // V2 we know the length + const values = new Array(daph2.num_values) + const bitWidth = getBitWidth(dLevelMax) + readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values) + return values + } +} diff --git a/lib/reader.ts b/lib/reader.ts index aca08e6b..157939c1 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -27,6 +27,7 @@ import { Cursor, Options } from './codec/types'; import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3'; import type { Readable } from 'stream'; import type { Blob } from 'buffer'; +import {readDefinitionLevelsV2, readRepetitionLevelsV2} from "./datapageV2"; const { getBloomFiltersFor } = bloomFilterReader; @@ -162,7 +163,7 @@ export class ParquetReader { } /** - * Open the parquet file from a url using the supplied request module + * Open the parquet file from a URL using the supplied request module * params should either be a string (url) or an object that includes * a `url` property. * This function returns a new parquet reader @@ -549,9 +550,7 @@ export class ParquetEnvelopeReader { const headers = Object.assign({}, defaultHeaders, { range }); const response = await fetch(url, { headers }); const arrayBuffer = await response.arrayBuffer(); - const buffer = Buffer.from(arrayBuffer); - - return buffer; + return Buffer.from(arrayBuffer); }; const closeFn = () => ({}); @@ -1080,27 +1079,53 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding); /* read repetition levels */ - let rLevels = new Array(valueCount); - if (opts.rLevelMax! > 0) { + let use_old_rlevels = false; + let rLevels: Array; + let reader: DataReader = { + view: new DataView(cursor.buffer.buffer, cursor.offset), + offset: 0 + } + + if (use_old_rlevels) { + rLevels = new Array(valueCount); + if (opts.rLevelMax! > 0) { rLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { bitWidth: parquet_util.getBitWidth(opts.rLevelMax!), disableEnvelope: true, - }); + }); + } else { + rLevels.fill(0); + } } else { - rLevels.fill(0); + rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0); + reader.offset = dataPageHeaderV2.repetition_levels_byte_length; + console.log("reader offset: ", reader.offset) } /* read definition levels */ - let dLevels = new Array(valueCount); - if (opts.dLevelMax! > 0) { + let dLevels: Array|undefined; + let use_old_dlevels = false; + if (use_old_dlevels) { + + dLevels = new Array(valueCount) + if (opts.dLevelMax! > 0) { dLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { bitWidth: parquet_util.getBitWidth(opts.dLevelMax!), disableEnvelope: true, }); + } else { + dLevels.fill(0); + } + } else { - dLevels.fill(0); + dLevels = readDefinitionLevelsV2(reader, dataPageHeaderV2, opts.dLevelMax || 0) + console.log("reader offset: ", reader.offset); } + cursor.offset += reader.offset + console.log("cursor.offset after reading dLevels: ", cursor.offset) + + // by this point the cursor offset should be 13. /* read values */ let valuesBufCursor = cursor; diff --git a/test/lib/codec/rle.test.ts b/test/lib/codec/rle.test.ts new file mode 100644 index 00000000..dc51f6cf --- /dev/null +++ b/test/lib/codec/rle.test.ts @@ -0,0 +1,53 @@ +import { expect } from 'chai'; +import { decodeRunBitpacked } from '../../../lib/codec/rle'; +import type { Cursor } from '../../../lib/codec/types'; + +describe('RLE Codec', function () { + describe('#decodeRunBitpacked', function () { + it('can decode a known bitpack value', function () { + const cursor: Cursor = { + // 136, 1, left off the front + buffer: Buffer.from([7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]), + offset: 0, + }; + const values = decodeRunBitpacked(cursor, 24, { bitWidth: 1 }); + expect(values.length).equals(24); + console.log(values) + }); + + // use the example from the documentation for RLE/Bitpacked hybrid, + // https://parquet.apache.org/docs/file-format/data-pages/encodings/#RLE + it('writes and reads bit packed values for documentation example correctly', () => { + const decVals = [0, 1, 2, 3, 4, 5, 6, 7] + + const bytesOfEncodedData = 4 + const buffer = new ArrayBuffer(bytesOfEncodedData) + + const view = new DataView(buffer); + + // the number of values? or the number of bits used for encoding a set of values ? + const bitPackedRunLength = 8; + const bitPackedScaledRunLength = bitPackedRunLength/8; + // in the grammar it says it's EITHER the left-shifted value OR 1 if the shifted value is 0?? but that makes + // no sense and results in an error. + const shiftedBPSRL = (bitPackedScaledRunLength << 1) | 1; + + view.setUint8(0, shiftedBPSRL); + + // byte values 1-3, from the example + view.setUint8(1, 0b10001000); + view.setUint8(2, 0b11000110); + view.setUint8(3, 0b11111010); + + const bitWidth = 3; + + // number of expected values in the result = 8 + const cursor = { buffer: Buffer.from(view.buffer), offset: 0 } + const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth} ); + expect(values.length).equals(decVals.length); + values.forEach((val,i) => { + expect(val).equals(decVals[i]); + }) + }); + }); +}); diff --git a/test/test-files.js b/test/test-files.js index 15da0578..94d967bb 100644 --- a/test/test-files.js +++ b/test/test-files.js @@ -81,7 +81,7 @@ describe('test-files', function () { }); // repeated values - it.skip('nation.dict.parquet loads', async function () { + it('nation.dict.parquet loads', async function() { await check('nation.dict.parquet', ['nation_key', 'name', 'region_key', 'comment_col']); }); From 6eeff4d99cd38b0ddc3b84aa048048e64b5020a4 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 4 Sep 2024 15:12:52 -0700 Subject: [PATCH 4/6] codec_rle examples pass --- lib/codec/encoding.ts | 11 ++--- lib/codec/plain_dictionary.ts | 12 ++++- lib/codec/rle.ts | 72 +++++++++++++----------------- lib/datapageV2.ts | 10 ++--- lib/reader.ts | 83 ++++++++++++++--------------------- test/codec_rle.js | 67 +++++++++++++++++++--------- test/lib/codec/rle.test.ts | 1 - 7 files changed, 131 insertions(+), 125 deletions(-) diff --git a/lib/codec/encoding.ts b/lib/codec/encoding.ts index 444a273f..fa5bec4e 100644 --- a/lib/codec/encoding.ts +++ b/lib/codec/encoding.ts @@ -54,11 +54,12 @@ export function bitWidth(value: number): number { * @param {number} length - length of the encoded data, in bytes (?) * @param {DecodedArray} output */ -export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray) { - if (!length) { - // length = reader.view.getUint32(reader.offset, true) +export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) { + + if (!disableEnvelope) { reader.offset += 4 } + const startOffset = reader.offset; let seen = 0 while (seen < output.length) { const header = readVarInt(reader) @@ -72,7 +73,7 @@ export function readRleBitPackedHybrid(reader: DataReader, width: number, length seen += count } } - // assert(reader.offset - startOffset === length) + console.assert(reader.offset - startOffset === length) } /** @@ -120,7 +121,7 @@ export function readBitPacked(reader: DataReader, seen: number): number { let count = header >> 1 << 3 // values to read const mask = (1 << bitWidth) - 1 - + // when reading definition levels v2 on readColumnChunk, ArrayBuffer len is 69 only let data = 0 if (reader.offset < reader.view.byteLength) { data = reader.view.getUint8(reader.offset++) diff --git a/lib/codec/plain_dictionary.ts b/lib/codec/plain_dictionary.ts index 916730b3..6e941c13 100644 --- a/lib/codec/plain_dictionary.ts +++ b/lib/codec/plain_dictionary.ts @@ -1,8 +1,16 @@ import * as rle from './rle'; -import { Cursor, Options } from './types'; +import {Cursor, DataReader, DecodedArray, Options} from './types' +import {readRleBitPackedHybrid} from "./encoding"; export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) { const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0); cursor.offset += 1; - return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth })); + const reader: DataReader = { + view: new DataView(cursor.buffer.buffer, cursor.offset), + offset: 0, + } + let output: DecodedArray = new Array(count); + readRleBitPackedHybrid(reader, bitWidth, count, output, true) + cursor.offset += reader.offset + return output; }; diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index c6f08290..9ac8b001 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -4,7 +4,7 @@ import varint from 'varint'; import { Cursor } from './types'; -import {readBitPacked, readRle, readVarInt} from "./encoding"; +import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding"; function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) { for (let i = 0; i < values.length % 8; i++) { @@ -45,9 +45,11 @@ function unknownToParsedInt(value: string | number) { export const encodeValues = function ( type: string, - values: number[], - opts: { bitWidth: number; disableEnvelope?: boolean } -) { + values: Array, + opts: { + bitWidth: number, + disableEnvelope?: boolean + }) { if (!('bitWidth' in opts)) { throw new Error('bitWidth is required'); } @@ -109,8 +111,8 @@ export const encodeValues = function ( // opts.bitWidth is undefined when the boolean values are being passed // decode a bitpacked value // setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad. -export function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWidth: number }): Array { - const run_old_code = false; +export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { + const run_old_code = true; let output = new Array(count).fill(0); if (run_old_code) { if (count % 8 !== 0) { @@ -126,12 +128,12 @@ export function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWi cursor.offset += opts.bitWidth * (count / 8); } else { - const view = new DataView(cursor.buffer.buffer); - const reader = { view, offset: cursor.offset } + let arrayBuf = cursor.buffer.buffer.slice(0, count); + const view = new DataView(arrayBuf, cursor.offset, count); + const reader = {view, offset: 0} const header = readVarInt(reader); readBitPacked(reader, header, opts.bitWidth, output, 0) } - console.log({output}) return output; } @@ -145,51 +147,38 @@ export function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWi // setting this to run old code lets the RLE/bitpacked hybrid documentation example still pass. // So maybe this code is fine. export function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { - const run_old_code = true; let output = new Array(count).fill(0); - if (run_old_code) { - var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); - let value = 0; + var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); + let value = 0; - for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) { + for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) { const byte = cursor.buffer[cursor.offset]; - // Bytes are stored LSB to MSB, so we need to shift - // each new byte appropriately. - value += byte << (i * 8); - cursor.offset += 1; - } - - output = new Array(count).fill(value); - } else { - const view = new DataView(cursor.buffer.buffer); - const reader = { view, offset: cursor.offset}; - readRle(reader, count, opts.bitWidth, output, 0); + // Bytes are stored LSB to MSB, so we need to shift + // each new byte appropriately. + value += byte << (i * 8); + cursor.offset += 1; } - console.log({output}); + + output = new Array(count).fill(value); return output; } -export const decodeValues = function ( - _: string, - cursor: Cursor, - count: number, - opts: { bitWidth: number; disableEnvelope?: boolean } -) { +// cursor: contains buffer + offset for data +// count: the number of items expected to decode +// opts: must include bitWidth, disableEnvelope is optional, specify true to use all bytes, false to skip first four +// bytes +export const decodeValues = function (_: string, cursor: Cursor, count: number, opts: { + bitWidth: number, + disableEnvelope?: boolean +}) { if (!('bitWidth' in opts)) { throw new Error('bitWidth is required'); } - - // disableEnvelope is undefined and so is bitWidth + let values = []; + let res; if (!opts.disableEnvelope) { cursor.offset += 4; } - - let values = []; - let res; - - // Buffer index 23 = 136, this is an array of bytes, why is it "decoding" the value and adding an offset?? - // it was decoding to 7 and taking the cursor offset out decodes to 136, - // but I don't know that this is even correctly decoding. while (values.length < count) { const header = varint.decode(cursor.buffer, cursor.offset); cursor.offset += varint.encodingLength(header); @@ -208,5 +197,6 @@ export const decodeValues = function ( if (values.length !== count) { throw new Error('invalid RLE encoding'); } + return values; }; diff --git a/lib/datapageV2.ts b/lib/datapageV2.ts index 04e57e6f..697b3b78 100644 --- a/lib/datapageV2.ts +++ b/lib/datapageV2.ts @@ -25,12 +25,11 @@ export const readRepetitionLevelsV2 = (reader: DataReader, daph2: DataPageHeaderV2, rLevelMax: number, ): Array => { - if (!rLevelMax) return [] const values = new Array(daph2.num_values) + if (!rLevelMax) return values.fill(0); const bitWidth = getBitWidth(rLevelMax) - readRleBitPackedHybrid( - reader, bitWidth, daph2.repetition_levels_byte_length, values - ) + let disableEnvelope = daph2.definition_levels_byte_length === 0 + readRleBitPackedHybrid(reader, bitWidth, daph2.repetition_levels_byte_length, values, disableEnvelope) return values } @@ -49,7 +48,8 @@ export const readDefinitionLevelsV2 = (reader: DataReader, // V2 we know the length const values = new Array(daph2.num_values) const bitWidth = getBitWidth(dLevelMax) - readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values) + let disableEnvelope = daph2.definition_levels_byte_length === 0 + readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values, disableEnvelope) return values } } diff --git a/lib/reader.ts b/lib/reader.ts index 157939c1..35d81870 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -145,7 +145,7 @@ export class ParquetReader { /** * Open the parquet file from S3 using the supplied aws client [, commands] and params * The params have to include `Bucket` and `Key` to the file requested, - * If using v3 of the AWS SDK, combine the client and commands into an object wiht keys matching + * If using v3 of the AWS SDK, combine the client and commands into an object with keys matching * the original module names, and do not instantiate the commands; pass them as classes/modules. * * This function returns a new parquet reader [ or throws an Error.] @@ -739,7 +739,6 @@ export class ParquetEnvelopeReader { buffer.columnData![colKey.join(',')] = await this.readColumnChunk(schema, colChunk); } - console.log("buffer columnData outside forEach: ", buffer.columnData); return buffer; } @@ -944,16 +943,20 @@ async function decodePages(buffer: Buffer, opts: Options) { pageData.values = pageData.values!.map((d) => opts.dictionary![d]); } - const length = pageData.rlevels != undefined ? pageData.rlevels.length : 0; + const length = pageData.rlevels !== undefined ? pageData.dlevels?.length : 0; - for (let i = 0; i < length; i++) { - data.rlevels!.push(pageData.rlevels![i]); - data.dlevels!.push(pageData.dlevels![i]); - const value = pageData.values![i]; - if (value !== undefined) { - data.values!.push(value); - } + if (pageData.rlevels?.length) { + data.rlevels = pageData.rlevels; + } + if (pageData.dlevels?.length) { + data.dlevels = pageData.dlevels; + } + if (length > 0) { + pageData.values?.forEach(val => { + if (val !== undefined) data.values!.push(val) + }) } + data.count! += pageData.count!; data.pageHeaders!.push(pageData.pageHeader!); } @@ -1070,6 +1073,12 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, }; } +// ensures minimum allocation of ArrayBuffer for the DataView. +function dataViewFromCursor(cursor: Cursor, offset?: number): DataView { + // @ts-ignore + return new DataView(cursor.buffer.buffer, cursor.offset); +} + async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeaderV2 = header.data_page_header_v2!; @@ -1079,63 +1088,30 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding); /* read repetition levels */ - let use_old_rlevels = false; + const use_old_rlevels = false; let rLevels: Array; let reader: DataReader = { - view: new DataView(cursor.buffer.buffer, cursor.offset), + view: dataViewFromCursor(cursor), offset: 0 } - if (use_old_rlevels) { - rLevels = new Array(valueCount); - if (opts.rLevelMax! > 0) { - rLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { - bitWidth: parquet_util.getBitWidth(opts.rLevelMax!), - disableEnvelope: true, - }); - } else { - rLevels.fill(0); - } - } else { - rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0); - reader.offset = dataPageHeaderV2.repetition_levels_byte_length; - console.log("reader offset: ", reader.offset) - } + rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0); + reader.offset = dataPageHeaderV2.repetition_levels_byte_length; /* read definition levels */ let dLevels: Array|undefined; - let use_old_dlevels = false; - if (use_old_dlevels) { + dLevels = readDefinitionLevelsV2(reader, dataPageHeaderV2, opts.dLevelMax || 0) + cursor.offset += reader.offset; - dLevels = new Array(valueCount) - if (opts.dLevelMax! > 0) { - dLevels = decodeValues(PARQUET_RDLVL_TYPE, PARQUET_RDLVL_ENCODING, cursor, valueCount, { - bitWidth: parquet_util.getBitWidth(opts.dLevelMax!), - disableEnvelope: true, - }); - } else { - dLevels.fill(0); - } - } else { - dLevels = readDefinitionLevelsV2(reader, dataPageHeaderV2, opts.dLevelMax || 0) - console.log("reader offset: ", reader.offset); - } - cursor.offset += reader.offset - console.log("cursor.offset after reading dLevels: ", cursor.offset) - - - // by this point the cursor offset should be 13. /* read values */ let valuesBufCursor = cursor; if (dataPageHeaderV2.is_compressed) { const valuesBuf = await parquet_compression.inflate( opts.compression!, - cursor.buffer.subarray(cursor.offset, cursorEnd) - ); + cursor.buffer.subarray(cursor.offset, cursorEnd)); - // with the correction, the inflated valuesBufCursor matches the rust version by this point. valuesBufCursor = { buffer: valuesBuf, offset: 0, @@ -1145,7 +1121,12 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade cursor.offset = cursorEnd; } - const values = decodeValues(opts.type!, valueEncoding as ParquetCodec, valuesBufCursor, valueCountNonNull, { + const values = decodeValues( + opts.type!, + valueEncoding as ParquetCodec, + valuesBufCursor, + valueCountNonNull, + { bitWidth: opts.column!.typeLength!, treatInt96AsTimestamp: opts.treatInt96AsTimestamp, ...opts.column!, diff --git a/test/codec_rle.js b/test/codec_rle.js index 7b2528ad..f5ff1537 100644 --- a/test/codec_rle.js +++ b/test/codec_rle.js @@ -2,6 +2,14 @@ const chai = require('chai'); const assert = chai.assert; const parquet_codec_rle = require('../lib/codec/rle'); +const {readRleBitPackedHybrid} = require("../lib/codec/encoding"); + +function dataViewFromArray(data) { + const ab = new ArrayBuffer(data.length,{ maxByteLength: data.length }); + let view = new DataView(ab, 0); + data.forEach((val,idx) => view.setUint8(idx, val)); + return view; +} describe('ParquetCodec::RLE', function () { it('should encode bitpacked values', function () { @@ -72,20 +80,30 @@ describe('ParquetCodec::RLE', function () { }); it('should decode repeated values', function () { + const data = [0x10, 0x87, 0xd6, 0x12]; + let cursor = { + buffer: Buffer.from(data), + offset: 0, + }; + const expectedDecoded = [1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567] + const bitWidth = 21; + const disableEnvelope = true; let vals = parquet_codec_rle.decodeValues( - 'INT32', - { - buffer: Buffer.from([0x10, 0x87, 0xd6, 0x12]), - offset: 0, - }, - 8, - { - disableEnvelope: true, - bitWidth: 21, - } + 'UNUSED', + cursor, + expectedDecoded.length, + { disableEnvelope, bitWidth} ); - assert.deepEqual(vals, [1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567, 1234567]); + assert.deepEqual(vals, expectedDecoded); + + let view = dataViewFromArray(data); + const reader = { view , offset: 0 }; + let output = new Array(expectedDecoded.length); + readRleBitPackedHybrid(reader, bitWidth, data.length, output, disableEnvelope) + + assert.deepEqual(output, expectedDecoded); + }); it('should encode mixed runs', function () { @@ -102,19 +120,28 @@ describe('ParquetCodec::RLE', function () { }); it('should decode mixed runs', function () { + const expectedDecoded = [0, 1, 2, 3, 4, 5, 6, 7, 4, 4, 4, 4, 4, 4, 4, 4, 0, 1, 2, 3, 4, 5, 6, 7]; + const data = [0x03, 0x88, 0xc6, 0xfa, 0x10, 0x04, 0x03, 0x88, 0xc6, 0xfa]; + const disableEnvelope = true; + const bitWidth = 3; + let vals = parquet_codec_rle.decodeValues( - 'INT32', + 'UNUSED', { - buffer: Buffer.from([0x03, 0x88, 0xc6, 0xfa, 0x10, 0x04, 0x03, 0x88, 0xc6, 0xfa]), + buffer: Buffer.from(data), offset: 0, }, - 24, - { - disableEnvelope: true, - bitWidth: 3, - } - ); + expectedDecoded.length, + { disableEnvelope, bitWidth }); + + assert.deepEqual(vals,expectedDecoded); + + let view = dataViewFromArray(data); + const reader = { view , offset: 0 }; + let output = new Array(expectedDecoded.length); + readRleBitPackedHybrid(reader, bitWidth, data.length, output, disableEnvelope) + + assert.deepEqual(output, expectedDecoded); - assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 4, 4, 4, 4, 4, 4, 4, 4, 0, 1, 2, 3, 4, 5, 6, 7]); }); }); diff --git a/test/lib/codec/rle.test.ts b/test/lib/codec/rle.test.ts index dc51f6cf..f22c3de7 100644 --- a/test/lib/codec/rle.test.ts +++ b/test/lib/codec/rle.test.ts @@ -12,7 +12,6 @@ describe('RLE Codec', function () { }; const values = decodeRunBitpacked(cursor, 24, { bitWidth: 1 }); expect(values.length).equals(24); - console.log(values) }); // use the example from the documentation for RLE/Bitpacked hybrid, From 2331aa03cab092d36f537231f06e6f8d6e35b77a Mon Sep 17 00:00:00 2001 From: shannonwells Date: Fri, 13 Sep 2024 11:28:26 -0700 Subject: [PATCH 5/6] WIP testing Buffer issue --- lib/codec/encoding.ts | 3 ++- lib/codec/plain_dictionary.ts | 6 +++-- lib/codec/rle.ts | 9 ++++--- lib/datapageV2.ts | 2 +- lib/reader.ts | 16 +++-------- lib/util.ts | 7 +++++ test/codec_rle.js | 2 +- test/lib/codec/rle.test.ts | 51 +++++++++++++++++++++++------------ 8 files changed, 58 insertions(+), 38 deletions(-) diff --git a/lib/codec/encoding.ts b/lib/codec/encoding.ts index fa5bec4e..8e3b37f6 100644 --- a/lib/codec/encoding.ts +++ b/lib/codec/encoding.ts @@ -53,6 +53,7 @@ export function bitWidth(value: number): number { * @param {number} width - width of each bit-packed group * @param {number} length - length of the encoded data, in bytes (?) * @param {DecodedArray} output + * @param {disableEnvelope} - set to true to consume entire buffer, false to assume (and therefore skip) a 4 byte header */ export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) { @@ -121,7 +122,7 @@ export function readBitPacked(reader: DataReader, seen: number): number { let count = header >> 1 << 3 // values to read const mask = (1 << bitWidth) - 1 - // when reading definition levels v2 on readColumnChunk, ArrayBuffer len is 69 only + let data = 0 if (reader.offset < reader.view.byteLength) { data = reader.view.getUint8(reader.offset++) diff --git a/lib/codec/plain_dictionary.ts b/lib/codec/plain_dictionary.ts index 6e941c13..d12423ba 100644 --- a/lib/codec/plain_dictionary.ts +++ b/lib/codec/plain_dictionary.ts @@ -5,12 +5,14 @@ import {readRleBitPackedHybrid} from "./encoding"; export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) { const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0); cursor.offset += 1; + // old: + // return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth })); const reader: DataReader = { view: new DataView(cursor.buffer.buffer, cursor.offset), offset: 0, } let output: DecodedArray = new Array(count); - readRleBitPackedHybrid(reader, bitWidth, count, output, true) - cursor.offset += reader.offset + const disableEnvelope = true; + readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope) return output; }; diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index 9ac8b001..57a1e019 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -3,7 +3,7 @@ // https://github.com/apache/parquet-format/blob/master/Encodings.md import varint from 'varint'; -import { Cursor } from './types'; +import {Cursor} from './types'; import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding"; function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) { @@ -111,6 +111,10 @@ export const encodeValues = function ( // opts.bitWidth is undefined when the boolean values are being passed // decode a bitpacked value // setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad. +// cursor: Cursor containing the data to be decoded +// count: the number of values expected to result from the decoding +// opts: bitWidth is required. +// returns: a DecodedArray export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { const run_old_code = true; let output = new Array(count).fill(0); @@ -128,8 +132,7 @@ export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWid cursor.offset += opts.bitWidth * (count / 8); } else { - let arrayBuf = cursor.buffer.buffer.slice(0, count); - const view = new DataView(arrayBuf, cursor.offset, count); + const view = new DataView(cursor.buffer.buffer, cursor.offset); const reader = {view, offset: 0} const header = readVarInt(reader); readBitPacked(reader, header, opts.bitWidth, output, 0) diff --git a/lib/datapageV2.ts b/lib/datapageV2.ts index 697b3b78..0cbb8872 100644 --- a/lib/datapageV2.ts +++ b/lib/datapageV2.ts @@ -48,7 +48,7 @@ export const readDefinitionLevelsV2 = (reader: DataReader, // V2 we know the length const values = new Array(daph2.num_values) const bitWidth = getBitWidth(dLevelMax) - let disableEnvelope = daph2.definition_levels_byte_length === 0 + const disableEnvelope = true readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values, disableEnvelope) return values } diff --git a/lib/reader.ts b/lib/reader.ts index 35d81870..5052eeeb 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -28,6 +28,7 @@ import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s import type { Readable } from 'stream'; import type { Blob } from 'buffer'; import {readDefinitionLevelsV2, readRepetitionLevelsV2} from "./datapageV2"; +import {dataReaderFromCursor} from "./util"; const { getBloomFiltersFor } = bloomFilterReader; @@ -96,7 +97,6 @@ class ParquetCursor { this.columnList ); - // now this one is *@($&ing up, it's dematerializing records this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer); this.rowGroupIndex++; this.cursorIndex = 0; @@ -943,7 +943,7 @@ async function decodePages(buffer: Buffer, opts: Options) { pageData.values = pageData.values!.map((d) => opts.dictionary![d]); } - const length = pageData.rlevels !== undefined ? pageData.dlevels?.length : 0; + const length = pageData.rlevels !== undefined ? pageData.rlevels?.length : 0; if (pageData.rlevels?.length) { data.rlevels = pageData.rlevels; @@ -1073,12 +1073,6 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader, }; } -// ensures minimum allocation of ArrayBuffer for the DataView. -function dataViewFromCursor(cursor: Cursor, offset?: number): DataView { - // @ts-ignore - return new DataView(cursor.buffer.buffer, cursor.offset); -} - async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise> { const cursorEnd = cursor.offset + header.compressed_page_size; const dataPageHeaderV2 = header.data_page_header_v2!; @@ -1088,12 +1082,8 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding); /* read repetition levels */ - const use_old_rlevels = false; let rLevels: Array; - let reader: DataReader = { - view: dataViewFromCursor(cursor), - offset: 0 - } + let reader = dataReaderFromCursor(cursor, 0) rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0); reader.offset = dataPageHeaderV2.repetition_levels_byte_length; diff --git a/lib/util.ts b/lib/util.ts index bfd65e24..28a46fd9 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -4,6 +4,7 @@ import fs, { WriteStream } from 'fs'; import * as parquet_thrift from '../gen-nodejs/parquet_types'; import { FileMetaDataExt, WriterOptions } from './declare'; import { Int64 } from 'thrift'; +import {Cursor, DataReader} from "./codec/types"; // Use this so users only need to implement the minimal amount of the WriteStream interface export type WriteStreamMinimal = Pick; @@ -232,3 +233,9 @@ export const fieldIndexOf = function (arr: unknown[][], elem: unknown[]) { export const cloneInteger = (int: Int64) => { return new Int64(int.valueOf()); }; + + +export function dataReaderFromCursor(data: Cursor, offset?: number): DataReader { + let view = new DataView(data.buffer.buffer, data.offset); + return { view , offset: offset || 0}; +} diff --git a/test/codec_rle.js b/test/codec_rle.js index f5ff1537..5c7cb770 100644 --- a/test/codec_rle.js +++ b/test/codec_rle.js @@ -8,7 +8,7 @@ function dataViewFromArray(data) { const ab = new ArrayBuffer(data.length,{ maxByteLength: data.length }); let view = new DataView(ab, 0); data.forEach((val,idx) => view.setUint8(idx, val)); - return view; + return view } describe('ParquetCodec::RLE', function () { diff --git a/test/lib/codec/rle.test.ts b/test/lib/codec/rle.test.ts index f22c3de7..fa83b4cd 100644 --- a/test/lib/codec/rle.test.ts +++ b/test/lib/codec/rle.test.ts @@ -1,19 +1,36 @@ -import { expect } from 'chai'; -import { decodeRunBitpacked } from '../../../lib/codec/rle'; -import type { Cursor } from '../../../lib/codec/types'; +import {expect} from 'chai'; +import {decodeRunBitpacked} from '../../../lib/codec/rle'; +import {readRleBitPackedHybrid} from "../../../lib/codec/encoding"; describe('RLE Codec', function () { - describe('#decodeRunBitpacked', function () { - it('can decode a known bitpack value', function () { - const cursor: Cursor = { - // 136, 1, left off the front - buffer: Buffer.from([7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]), - offset: 0, - }; - const values = decodeRunBitpacked(cursor, 24, { bitWidth: 1 }); - expect(values.length).equals(24); - }); + it('can decode a known bitpack value', function () { + // 136 = 10001000, 0x80 = 128 or 100000000 + const bitPackedDecBuffer = Buffer.from([136, 1, 7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]); + + const myView = new DataView(bitPackedDecBuffer.buffer); + let decoder = new TextDecoder('utf-8'); + const res = bitPackedDecBuffer.map((val, i, ary) => myView.getUint8(i)); + // Answer was `var N1 = Mat` which is part of the compiled parquetjs library. + // Undid some changes and now it's `module.exports = read`... + console.log(decoder.decode(res)); + const reader = {view: myView, offset: 0}; + const values = new Array(26).fill(0); + const expected = [ + 1, 1, 1, 1, 0, 1, 1, 0, + 0, 1, 1, 1, 0, 1, 1, 0, + 1, 1, 0, 0, 1, 1, 1, 0, + 0, 0 + ]; + readRleBitPackedHybrid(reader, 1, 13, values, true); + expect(values.length).equals(expected.length) + // correct? + values.forEach((val, i) => { + expect(val, `${val} != ${expected[i]} for i = ${i}`).equals(expected[i]) + }) + }); + + describe('#decodeRunBitpacked', function () { // use the example from the documentation for RLE/Bitpacked hybrid, // https://parquet.apache.org/docs/file-format/data-pages/encodings/#RLE it('writes and reads bit packed values for documentation example correctly', () => { @@ -26,7 +43,7 @@ describe('RLE Codec', function () { // the number of values? or the number of bits used for encoding a set of values ? const bitPackedRunLength = 8; - const bitPackedScaledRunLength = bitPackedRunLength/8; + const bitPackedScaledRunLength = bitPackedRunLength / 8; // in the grammar it says it's EITHER the left-shifted value OR 1 if the shifted value is 0?? but that makes // no sense and results in an error. const shiftedBPSRL = (bitPackedScaledRunLength << 1) | 1; @@ -41,10 +58,10 @@ describe('RLE Codec', function () { const bitWidth = 3; // number of expected values in the result = 8 - const cursor = { buffer: Buffer.from(view.buffer), offset: 0 } - const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth} ); + const cursor = {buffer: Buffer.from(view.buffer), offset: 0} + const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth}); expect(values.length).equals(decVals.length); - values.forEach((val,i) => { + values.forEach((val, i) => { expect(val).equals(decVals[i]); }) }); From 1c725f77ba13fe36110133ad7b4aeeb2a83a0bd6 Mon Sep 17 00:00:00 2001 From: shannonwells Date: Wed, 13 May 2026 11:05:56 -0700 Subject: [PATCH 6/6] working on RLE failure --- .tool-versions | 2 +- lib/codec/encoding.ts | 2 -- lib/codec/plain_dictionary.ts | 2 +- lib/codec/rle.ts | 60 +++++++++++++++++++++++------------ lib/util.ts | 2 ++ test/test-files.js | 2 +- 6 files changed, 44 insertions(+), 26 deletions(-) diff --git a/.tool-versions b/.tool-versions index adcc73e9..71c0000b 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -nodejs 20.20.2 +nodejs 22.17.1 diff --git a/lib/codec/encoding.ts b/lib/codec/encoding.ts index 8e3b37f6..38e464f8 100644 --- a/lib/codec/encoding.ts +++ b/lib/codec/encoding.ts @@ -60,7 +60,6 @@ export function readRleBitPackedHybrid(reader: DataReader, width: number, length if (!disableEnvelope) { reader.offset += 4 } - const startOffset = reader.offset; let seen = 0 while (seen < output.length) { const header = readVarInt(reader) @@ -74,7 +73,6 @@ export function readRleBitPackedHybrid(reader: DataReader, width: number, length seen += count } } - console.assert(reader.offset - startOffset === length) } /** diff --git a/lib/codec/plain_dictionary.ts b/lib/codec/plain_dictionary.ts index d12423ba..34c521d9 100644 --- a/lib/codec/plain_dictionary.ts +++ b/lib/codec/plain_dictionary.ts @@ -11,7 +11,7 @@ export const decodeValues = function (type: string, cursor: Cursor, count: numbe view: new DataView(cursor.buffer.buffer, cursor.offset), offset: 0, } - let output: DecodedArray = new Array(count); + const output: DecodedArray = new Array(count); const disableEnvelope = true; readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope) return output; diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index 57a1e019..a759f1c4 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -116,27 +116,45 @@ export const encodeValues = function ( // opts: bitWidth is required. // returns: a DecodedArray export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { - const run_old_code = true; - let output = new Array(count).fill(0); - if (run_old_code) { - if (count % 8 !== 0) { - throw 'must be a multiple of 8'; - } + const output = new Array(count).fill(0); + const bytesNeeded = Math.ceil((opts.bitWidth * count) / 8); + + // const values = new Array(count).fill(0); + // for (let b = 0; b < opts.bitWidth * count; ++b) { + // if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) { + // values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth); + // } + // } + // + // cursor.offset += opts.bitWidth * (count / 8); + // } else { + // const view = new DataView(cursor.buffer.buffer, cursor.offset); + // const reader = {view, offset: 0} + // const header = readVarInt(reader); + // readBitPacked(reader, header, opts.bitWidth, output, 0) + // } + // return output; + // } + + // IMPORTANT: Create DataView with proper offset handling + // Buffer.buffer might have an internal byteOffset we need to account for + const view = new DataView( + cursor.buffer.buffer, + cursor.buffer.byteOffset + cursor.offset, + bytesNeeded + ); + const reader = {view, offset: 0} + // DON'T read a header - we're already past it! + // The header was already consumed by decodeValues + // We just need to decode the bit-packed data directly + + // Create a fake header for the bit-packed run + // count is already the number of values (multiple of 8) + const header = ((count / 8) << 1) | 1; // bit-packed header + const seen = readBitPacked(reader, header, opts.bitWidth, output, 0); + // Update cursor position + cursor.offset += bytesNeeded; - const values = new Array(count).fill(0); - for (let b = 0; b < opts.bitWidth * count; ++b) { - if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) { - values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth); - } - } - - cursor.offset += opts.bitWidth * (count / 8); - } else { - const view = new DataView(cursor.buffer.buffer, cursor.offset); - const reader = {view, offset: 0} - const header = readVarInt(reader); - readBitPacked(reader, header, opts.bitWidth, output, 0) - } return output; } @@ -151,7 +169,7 @@ export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWid // So maybe this code is fine. export function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }): Array { let output = new Array(count).fill(0); - var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); + const bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); let value = 0; for (let i = 0; i < bytesNeededForFixedBitWidth; ++i) { diff --git a/lib/util.ts b/lib/util.ts index 28a46fd9..f7ade1e1 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -112,7 +112,9 @@ export const getBitWidth = function (val: number) { */ export const getThriftEnum = function (klass: Enums, value: unknown) { for (const k in klass) { + console.log(`Does ${klass[k]} ${value}?`) if (klass[k] === value) { + console.log("FOUND!") return k; } } diff --git a/test/test-files.js b/test/test-files.js index 94d967bb..1be377b5 100644 --- a/test/test-files.js +++ b/test/test-files.js @@ -227,7 +227,7 @@ describe('test-files', function () { describe('RLE', function () { // Tracked in https://github.com/LibertyDSNP/parquetjs/issues/113 - it.skip('rle_boolean_encoding.parquet loads', async function () { + it('rle_boolean_encoding.parquet loads', async function () { const data = await readData('rle/rle_boolean_encoding.parquet'); assert.deepEqual(data[0], { datatype_boolean: true }); assert.deepEqual(data[1], { datatype_boolean: false });