Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nodejs 20.20.2
nodejs 22.17.1
8 changes: 6 additions & 2 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Copyright for portions of the project are (c) 2017 ironSource Ltd. https://github.com/ironSource/parquetjs
Copyright for portions of the project are https://github.com/ZJONSSON/parquetjs
Copyright for portions of the project are https://github.com/LibertyDSNP/parquetjs

Copyright for portions of the project are (c) https://github.com/ZJONSSON/parquetjs

Copyright for portions of the project are (c) https://github.com/LibertyDSNP/parquetjs

Copyright for portions of the project are (c) https://github.com/hyparam/hyparquet

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in the
Expand Down
210 changes: 210 additions & 0 deletions lib/codec/encoding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
///
/// Modified under the MIT License from
/// https://github.com/hyparam/hyparquet
/// The MIT License (MIT)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import {ParquetType} from "../declare";
import {DataReader, DecodedArray } from "./types";

/**
* Var int, also known as Unsigned LEB128.
* Var ints take 1 to 5 bytes (int32) or 1 to 10 bytes (int64).
* Reads groups of 7 low bits until high bit is 0.
*
* @param {DataReader} reader
* @returns {number} value
*/
export function readVarInt(reader: DataReader) {
let result = 0
let shift = 0
while (true) {
const byte = reader.view.getUint8(reader.offset++)
result |= (byte & 0x7f) << shift
if (!(byte & 0x80)) {
return result
}
shift += 7
}
}

/**
* Minimum bits needed to store value.
*
* @param {number} value
* @returns {number}
*/
export function bitWidth(value: number): number {
return 32 - Math.clz32(value)
}

/**
* Read values from a run-length encoded/bit-packed hybrid encoding.
*
* If length is zero, then read int32 length at the start.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {DataReader} reader
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data, in bytes (?)
* @param {DecodedArray} output
* @param {disableEnvelope} - set to true to consume entire buffer, false to assume (and therefore skip) a 4 byte header
*/
export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) {

if (!disableEnvelope) {
reader.offset += 4
}
let seen = 0
while (seen < output.length) {
const header = readVarInt(reader)
if (header & 1) {
// bit-packed
seen = readBitPacked(reader, header, width, output, seen)
} else {
// rle
const count = header >>> 1
readRle(reader, count, width, output, seen)
seen += count
}
}
}

/**
* Run-length encoding: read value with bitWidth and repeat it count times.
*
* @param {DataReader} reader
* @param {number} count
* @param {number} bitWidth
* @param {DecodedArray} output
* @param {number} seen
*/
export function readRle(reader: DataReader,
count: number,
bitWidth: number,
output: DecodedArray,
seen: number) {
const width = bitWidth + 7 >> 3
let value = 0
for (let i = 0; i < width; i++) {
value |= reader.view.getUint8(reader.offset++) << (i << 3)
}
// assert(value < 1 << bitWidth)

// repeat value count times
for (let i = 0; i < count; i++) {
output[seen + i] = value
}
}

/**
* Read a bit-packed run of the rle/bitpack hybrid.
* Supports width > 8 (crossing bytes).
*
* @param {DataReader} reader
* @param {number} header - bit-pack header
* @param {number} bitWidth
* @param {DecodedArray} output
* @param {number} seen
* @returns {number} total output values so far
*/
export function readBitPacked(reader: DataReader,
header: number,
bitWidth: number,
output: DecodedArray,
seen: number): number {
let count = header >> 1 << 3 // values to read
const mask = (1 << bitWidth) - 1

let data = 0
if (reader.offset < reader.view.byteLength) {
data = reader.view.getUint8(reader.offset++)
} else if (mask) {
// sometimes out-of-bounds reads are masked out
throw new Error(`parquet bitpack offset ${reader.offset} out of range`)
}
let left = 8
let right = 0

// read values
while (count) {
// if we have crossed a byte boundary, shift the data
if (right > 8) {
right -= 8
left -= 8
data >>>= 8
} else if (left - right < bitWidth) {
// if we don't have bitWidth number of bits to read, read next byte
data |= reader.view.getUint8(reader.offset) << left
reader.offset++
left += 8
} else {
if (seen < output.length) {
// emit value
output[seen++] = data >> right & mask
}
count--
right += bitWidth
}
}

return seen
}

/**
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataReader} reader
* @param {number} count
* @param {ParquetType} type
* @param {number | undefined} typeLength
* @returns {DecodedArray}
*/
export function byteStreamSplit(reader: DataReader, count: number, type: ParquetType, typeLength: number|undefined) {
const width = byteWidth(type, typeLength)
const bytes = new Uint8Array(count * width)
for (let b = 0; b < width; b++) {
for (let i = 0; i < count; i++) {
bytes[i * width + b] = reader.view.getUint8(reader.offset++)
}
}
// interpret bytes as typed array
if (type === 'FLOAT') return new Float32Array(bytes.buffer)
else if (type === 'DOUBLE') return new Float64Array(bytes.buffer)
else if (type === 'INT32') return new Int32Array(bytes.buffer)
else if (type === 'INT64') return new BigInt64Array(bytes.buffer)
else if (type === 'FIXED_LEN_BYTE_ARRAY') {
// split into arrays of typeLength
const split = new Array(count)
for (let i = 0; i < count; i++) {
split[i] = bytes.subarray(i * width, (i + 1) * width)
}
return split
}
throw new Error(`parquet byte_stream_split unsupported type: ${type}`)
}

/**
* @param {ParquetType} type
* @param {number | undefined} typeLength
* @returns {number}
*/
function byteWidth(type: ParquetType, typeLength: number|undefined): number {
switch (type) {
case 'INT32':
case 'FLOAT':
return 4
case 'INT64':
case 'DOUBLE':
return 8
case 'FIXED_LEN_BYTE_ARRAY':
if (!typeLength) throw new Error('parquet byteWidth missing type_length')
return typeLength
default:
throw new Error(`parquet unsupported type: ${type}`)
}
}
14 changes: 12 additions & 2 deletions lib/codec/plain_dictionary.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import * as rle from './rle';

Check failure on line 1 in lib/codec/plain_dictionary.ts

View workflow job for this annotation

GitHub Actions / test

'rle' is defined but never used. Allowed unused vars must match /^_/u
import { Cursor, Options } from './types';
import {Cursor, DataReader, DecodedArray, Options} from './types'
import {readRleBitPackedHybrid} from "./encoding";

export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) {

Check failure on line 5 in lib/codec/plain_dictionary.ts

View workflow job for this annotation

GitHub Actions / test

'opts' is defined but never used. Allowed unused args must match /^_/u
const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0);
cursor.offset += 1;
return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth }));
// old:
// return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth }));
const reader: DataReader = {
view: new DataView(cursor.buffer.buffer, cursor.offset),
offset: 0,
}
const output: DecodedArray = new Array(count);
const disableEnvelope = true;
readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope)
return output;
};
109 changes: 78 additions & 31 deletions lib/codec/rle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// https://github.com/apache/parquet-format/blob/master/Encodings.md

import varint from 'varint';
import { Cursor } from './types';
import {Cursor} from './types';
import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding";

Check failure on line 7 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'readVarInt' is defined but never used. Allowed unused vars must match /^_/u

Check failure on line 7 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'readRleBitPackedHybrid' is defined but never used. Allowed unused vars must match /^_/u

Check failure on line 7 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'readRle' is defined but never used. Allowed unused vars must match /^_/u

function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) {
for (let i = 0; i < values.length % 8; i++) {
Expand Down Expand Up @@ -44,9 +45,11 @@

export const encodeValues = function (
type: string,
values: number[],
opts: { bitWidth: number; disableEnvelope?: boolean }
) {
values: Array<number>,

Check failure on line 48 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

Array type using 'Array<number>' is forbidden. Use 'number[]' instead
opts: {
bitWidth: number,
disableEnvelope?: boolean
}) {
if (!('bitWidth' in opts)) {
throw new Error('bitWidth is required');
}
Expand Down Expand Up @@ -105,23 +108,67 @@
return envelope;
};

function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }) {
if (count % 8 !== 0) {
throw new Error('must be a multiple of 8');
}

const values = new Array(count).fill(0);
for (let b = 0; b < opts.bitWidth * count; ++b) {
if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) {
values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth);
}
}

cursor.offset += opts.bitWidth * (count / 8);
return values;
// opts.bitWidth is undefined when the boolean values are being passed
// decode a bitpacked value
// setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad.
// cursor: Cursor containing the data to be decoded
// count: the number of values expected to result from the decoding
// opts: bitWidth is required.
// returns: a DecodedArray
export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array<number> {

Check failure on line 118 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

Array type using 'Array<number>' is forbidden. Use 'number[]' instead
const output = new Array(count).fill(0);
const bytesNeeded = Math.ceil((opts.bitWidth * count) / 8);

// const values = new Array(count).fill(0);
// for (let b = 0; b < opts.bitWidth * count; ++b) {
// if (cursor.buffer[cursor.offset + Math.floor(b / 8)] & (1 << (b % 8))) {
// values[Math.floor(b / opts.bitWidth)] |= 1 << (b % opts.bitWidth);
// }
// }
//
// cursor.offset += opts.bitWidth * (count / 8);
// } else {
// const view = new DataView(cursor.buffer.buffer, cursor.offset);
// const reader = {view, offset: 0}
// const header = readVarInt(reader);
// readBitPacked(reader, header, opts.bitWidth, output, 0)
// }
// return output;
// }

// IMPORTANT: Create DataView with proper offset handling
// Buffer.buffer might have an internal byteOffset we need to account for
const view = new DataView(
cursor.buffer.buffer,
cursor.buffer.byteOffset + cursor.offset,
bytesNeeded
);
const reader = {view, offset: 0}
// DON'T read a header - we're already past it!
// The header was already consumed by decodeValues
// We just need to decode the bit-packed data directly

// Create a fake header for the bit-packed run
// count is already the number of values (multiple of 8)
const header = ((count / 8) << 1) | 1; // bit-packed header
const seen = readBitPacked(reader, header, opts.bitWidth, output, 0);

Check failure on line 154 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

'seen' is assigned a value but never used. Allowed unused vars must match /^_/u
// Update cursor position
cursor.offset += bytesNeeded;

return output;
}

function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }) {
// decode an RLE value
// Note that the RLE encoding method is only supported for the following types of data:
//
// Repetition and definition levels
// Dictionary indices
// Boolean values in data pages, as an alternative to PLAIN encoding
// See https://parquet.apache.org/docs/file-format/data-pages/encodings/
// setting this to run old code lets the RLE/bitpacked hybrid documentation example still pass.
// So maybe this code is fine.
export function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }): Array<number> {

Check failure on line 170 in lib/codec/rle.ts

View workflow job for this annotation

GitHub Actions / test

Array type using 'Array<number>' is forbidden. Use 'number[]' instead
let output = new Array(count).fill(0);
const bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8);
let value = 0;

Expand All @@ -133,26 +180,26 @@
cursor.offset += 1;
}

return new Array(count).fill(value);
output = new Array(count).fill(value);
return output;
}

export const decodeValues = function (
_: string,
cursor: Cursor,
count: number,
opts: { bitWidth: number; disableEnvelope?: boolean }
) {
// cursor: contains buffer + offset for data
// count: the number of items expected to decode
// opts: must include bitWidth, disableEnvelope is optional, specify true to use all bytes, false to skip first four
// bytes
export const decodeValues = function (_: string, cursor: Cursor, count: number, opts: {
bitWidth: number,
disableEnvelope?: boolean
}) {
if (!('bitWidth' in opts)) {
throw new Error('bitWidth is required');
}

let values = [];
let res;
if (!opts.disableEnvelope) {
cursor.offset += 4;
}

let values = [];
let res;

while (values.length < count) {
const header = varint.decode(cursor.buffer, cursor.offset);
cursor.offset += varint.encodingLength(header);
Expand Down
Loading
Loading