Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 164 additions & 15 deletions bolt/dwio/parquet/arrow/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -93,6 +96,24 @@ using util::CodecOptions;

namespace {

[[noreturn]] void ThrowPageHeaderSizeError(
std::string_view sizeName,
int64_t size) {
throw ParquetException(
std::string(sizeName),
" page size cannot be represented in a Parquet PageHeader int32 "
"field: ",
size);
}

inline int32_t CheckPageHeaderSize(std::string_view sizeName, int64_t size) {
if (ARROW_PREDICT_FALSE(
size < 0 || size > std::numeric_limits<int32_t>::max())) {
ThrowPageHeaderSizeError(sizeName, size);
}
return static_cast<int32_t>(size);
}

// Visitor that extracts the value buffer from a FlatArray at a given offset.
struct ValueBufferSlicer {
template <typename T>
Expand Down Expand Up @@ -350,25 +371,30 @@ class SerializedPageWriter : public PageWriter {
dict_page_header.__set_is_sorted(page.is_sorted());

const uint8_t* output_data_buffer = compressed_data->data();
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
int64_t output_data_len = compressed_data->size();
const int32_t uncompressed_page_size =
CheckPageHeaderSize("Uncompressed dictionary", uncompressed_size);
int32_t compressed_page_size =
CheckPageHeaderSize("Compressed dictionary", output_data_len);

if (data_encryptor_.get()) {
UpdateEncryption(encryption::kDictionaryPage);
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
data_encryptor_->CiphertextSizeDelta() + compressed_page_size,
false));
output_data_len = data_encryptor_->Encrypt(
compressed_data->data(),
output_data_len,
compressed_page_size,
encryption_buffer_->mutable_data());
output_data_buffer = encryption_buffer_->data();
compressed_page_size =
CheckPageHeaderSize("Compressed dictionary", output_data_len);
}

format::PageHeader page_header;
page_header.__set_type(format::PageType::DICTIONARY_PAGE);
page_header.__set_uncompressed_page_size(
static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(
static_cast<int32_t>(output_data_len));
page_header.__set_uncompressed_page_size(uncompressed_page_size);
page_header.__set_compressed_page_size(compressed_page_size);
page_header.__set_dictionary_page_header(dict_page_header);
if (page_checksum_verification_) {
uint32_t crc32 =
Expand Down Expand Up @@ -452,24 +478,29 @@ class SerializedPageWriter : public PageWriter {
const int64_t uncompressed_size = page.uncompressed_size();
std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
int64_t output_data_len = compressed_data->size();
const int32_t uncompressed_page_size =
CheckPageHeaderSize("Uncompressed data", uncompressed_size);
int32_t compressed_page_size =
CheckPageHeaderSize("Compressed data", output_data_len);

if (data_encryptor_.get()) {
PARQUET_THROW_NOT_OK(encryption_buffer_->Resize(
data_encryptor_->CiphertextSizeDelta() + output_data_len, false));
data_encryptor_->CiphertextSizeDelta() + compressed_page_size,
false));
UpdateEncryption(encryption::kDataPage);
output_data_len = data_encryptor_->Encrypt(
compressed_data->data(),
output_data_len,
compressed_page_size,
encryption_buffer_->mutable_data());
output_data_buffer = encryption_buffer_->data();
compressed_page_size =
CheckPageHeaderSize("Compressed data", output_data_len);
}

format::PageHeader page_header;
page_header.__set_uncompressed_page_size(
static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(
static_cast<int32_t>(output_data_len));
page_header.__set_uncompressed_page_size(uncompressed_page_size);
page_header.__set_compressed_page_size(compressed_page_size);

if (page_checksum_verification_) {
uint32_t crc32 =
Expand Down Expand Up @@ -2754,8 +2785,56 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
ARROW_UNSUPPORTED();
}

constexpr int64_t kDataPageSizeSlack = 64L * 1024 * 1024;
const int64_t dataPageByteLimit = std::min<int64_t>(
data_pagesize_, std::numeric_limits<int32_t>::max() - kDataPageSizeSlack);

auto valueLength = [&](int64_t index) {
if (::arrow::is_binary_like(array.type_id())) {
return static_cast<int64_t>(
checked_cast<const ::arrow::BinaryArray&>(array).value_length(index));
}
DCHECK(::arrow::is_large_binary_like(array.type_id()));
return static_cast<int64_t>(
checked_cast<const ::arrow::LargeBinaryArray&>(array).value_length(
index));
};

auto valueRangeByteLength = [&](int64_t start, int64_t count) {
if (::arrow::is_binary_like(array.type_id())) {
const auto& binaryArray =
checked_cast<const ::arrow::BinaryArray&>(array);
return static_cast<int64_t>(
binaryArray.value_offset(start + count) -
binaryArray.value_offset(start));
}
DCHECK(::arrow::is_large_binary_like(array.type_id()));
const auto& largeBinaryArray =
checked_cast<const ::arrow::LargeBinaryArray&>(array);
return static_cast<int64_t>(
largeBinaryArray.value_offset(start + count) -
largeBinaryArray.value_offset(start));
};

auto hasSpacedValue = [&](int64_t levelIndex) {
if (def_levels == nullptr || level_info_.def_level == 0) {
return true;
}
return def_levels[levelIndex] >= level_info_.repeated_ancestor_def_level;
};

auto hasNonNullValue = [&](int64_t levelIndex, int64_t valueIndex) {
if (def_levels != nullptr &&
def_levels[levelIndex] != level_info_.def_level) {
return false;
}
return array.IsValid(valueIndex);
};

int64_t value_offset = 0;
auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {
auto WriteSubChunk = [&](int64_t offset,
int64_t batch_size,
bool check_page) {
int64_t batch_num_values = 0;
int64_t batch_num_spaced_values = 0;
int64_t null_count = 0;
Expand Down Expand Up @@ -2790,6 +2869,76 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
value_offset += batch_num_spaced_values;
};

auto WriteChunk = [&](int64_t offset, int64_t batch_size, bool check_page) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When int32_t overflows, the row group size has already far exceeded the default 128 MB limit, and the modified WriteChunk still cannot solve this problem.

In addition, the Parquet writer expects the written batch size to be 40 MB, as referenced here:

uint64_t writeBatchBytes = 40 * 1024 * 1024; // 40M

Can we solve this issue by modifying the logic in splitWriteRecordBatch? This would avoid both excessively large page sizes and excessively large row group sizes, while making the change more controlled.

@Weixin-Xu Weixin-Xu Jun 1, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem fixed here is mainly about the encoded data page size. writeBatchBytes limits the input batch before Parquet encoding/compression, while this issue happens after plain BYTE_ARRAY values are appended into the encoder. Therefore, even with a 40MB write batch limit, a single encoded data page can still become oversized.
The int32 guard here is only a safety check for invalid Parquet PageHeader sizes. The main fix is to split plain BYTE_ARRAY writes before appending too much encoded data into the page encoder. We can improve splitWriteRecordBatch separately for row group/input batch control, but I think the page-level split is still needed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check whether this has any performance impact and provide benchmark results?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can check the performance impact for normal cases.

For the overflow case, benchmark results may not be very meaningful since the original behavior could already write corrupted data when the page size exceeds int32_t. I will mainly verify that regular BYTE_ARRAY writes are not noticeably affected.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to verify the performance when the page size is normal and the WriteChunk method does not hit the fast path but enters the while loop.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current change keeps a fast path for the common flat/non-null case when the estimated encoded size is within the page limit.

I will add benchmark results for the normal page-size case where WriteChunk cannot take this fast path and falls back to the while-loop logic, e.g. with def levels / nulls.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added benchmarks for the normal page-size cases, including plain-encoded VARCHAR and nested ARRAY.

The benchmark results did reveal unexpected overhead in the common case, so I moved the fast-path check earlier: we now first use Arrow offsets to conservatively estimate whether the whole batch can fit in the current page. If it can, we write the batch directly and avoid entering the per-level while loop. The precise per-level splitting loop is only used when the batch is close to the page-size limit.

This keeps the oversized-page fix from affecting unrelated/common write paths while still preserving the correctness guard for pages that may exceed the Parquet PageHeader int32 size limit.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see some residual performance risk in the cases where the fast path is not applicable, especially nullable VARCHAR/VARBINARY and nested ARRAY. In those paths, WriteChunk enters the per-level loop and then WriteSubChunk scans levels again via MaybeCalculateValidityBits / WriteLevelsSpaced, so the no-split normal case may pay an extra full scan.

Current Bolt Parquet writer has no performance advantage over the Java writer for string types. I think checking whether data_pagesize_ is exceeded at every level is too strict and may introduce performance issues.

We can refer to Arrow’s implementation: https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.cc. Arrow checks whether int32_t overflows, but it does not check the page size level by level.

const bool split_by_byte_size =
check_page && !IsDictionaryEncoding(current_encoder_->encoding());
if (!split_by_byte_size) {
WriteSubChunk(offset, batch_size, check_page);
return;
}

if (def_levels == nullptr || level_info_.def_level == 0) {
const int64_t batch_encoded_bytes =
valueRangeByteLength(value_offset, batch_size) +
batch_size * static_cast<int64_t>(sizeof(uint32_t));
if (current_encoder_->EstimatedDataEncodedSize() + batch_encoded_bytes <=
dataPageByteLimit) {
WriteSubChunk(offset, batch_size, check_page);
return;
}
}

int64_t local_offset = offset;
int64_t remaining = batch_size;
while (remaining > 0) {
int64_t subchunk_levels = 0;
int64_t subchunk_spaced_values = 0;
int64_t subchunk_encoded_bytes = 0;

while (subchunk_levels < remaining) {
const int64_t level_index = local_offset + subchunk_levels;
const bool can_break_before_level =
!pages_change_on_record_boundaries() || rep_levels == nullptr ||
rep_levels[level_index] == 0;

int64_t value_bytes = 0;
if (hasSpacedValue(level_index)) {
const int64_t value_index = value_offset + subchunk_spaced_values;
if (hasNonNullValue(level_index, value_index)) {
value_bytes = static_cast<int64_t>(sizeof(uint32_t)) +
valueLength(value_index);
}
}

if (check_page && split_by_byte_size && can_break_before_level &&
current_encoder_->EstimatedDataEncodedSize() +
subchunk_encoded_bytes + value_bytes >
dataPageByteLimit) {
if (subchunk_levels == 0) {
if (num_buffered_values_ > 0) {
AddDataPage();
}
} else {
break;
}
}

if (hasSpacedValue(level_index)) {
++subchunk_spaced_values;
}
subchunk_encoded_bytes += value_bytes;
++subchunk_levels;
}

if (subchunk_levels == 0) {
throw ParquetException("Unable to split BYTE_ARRAY write chunk");
}
WriteSubChunk(local_offset, subchunk_levels, check_page);
local_offset += subchunk_levels;
remaining -= subchunk_levels;
}
};

PARQUET_CATCH_NOT_OK(DoInBatches(
def_levels,
rep_levels,
Expand Down
60 changes: 53 additions & 7 deletions bolt/dwio/parquet/tests/writer/ParquetWriterBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "bolt/dwio/parquet/writer/Writer.h"
#include "bolt/exec/tests/utils/TempDirectoryPath.h"

#include <filesystem>

#include <folly/Benchmark.h>
#include <folly/init/Init.h>
using namespace bytedance::bolt;
Expand Down Expand Up @@ -65,17 +67,18 @@ class ParquetWriterBenchmark {

~ParquetWriterBenchmark() {}

void writeToFile(
uint64_t writeToFile(
const std::vector<RowVectorPtr>& batches,
bool /*forRowGroupSkip*/) {
for (auto& batch : batches) {
writer_->write(batch);
}
writer_->flush();
writer_->close();
return std::filesystem::file_size(fileFolder_->path + "/" + fileName_);
}

void writeSingleColumn(
uint64_t writeSingleColumn(
const std::string& columnName,
const TypePtr& type,
uint8_t nullsRateX100,
Expand All @@ -89,7 +92,7 @@ class ParquetWriterBenchmark {
.withNullsForField(Subfield(columnName), nullsRateX100)
.build();
suspender.dismiss();
writeToFile(*batches, true);
return writeToFile(*batches, true);
}

private:
Expand All @@ -105,16 +108,18 @@ class ParquetWriterBenchmark {
};

void run(
uint32_t,
uint32_t iterations,
const std::string& columnName,
const TypePtr& type,
uint8_t nullsRateX100,
uint32_t batchSize,
bool disableDictionary) {
RowTypePtr rowType = ROW({columnName}, {type});
ParquetWriterBenchmark benchmark(disableDictionary, rowType);
BIGINT()->toString();
benchmark.writeSingleColumn(columnName, type, nullsRateX100, batchSize);
for (uint32_t i = 0; i < iterations; ++i) {
ParquetWriterBenchmark benchmark(disableDictionary, rowType);
folly::doNotOptimizeAway(benchmark.writeSingleColumn(
columnName, type, nullsRateX100, batchSize));
}
}

#define PARQUET_BENCHMARKS_NULLS(_type_, _name_, _null_) \
Expand All @@ -131,6 +136,41 @@ void run(
#define PARQUET_BENCHMARKS(_type_, _name_) \
PARQUET_BENCHMARKS_NULLS(_type_, _name_, 20)

// Benchmarks targeting the BYTE_ARRAY non-dictionary write path that is
// affected by the oversized parquet data page fix. The data sizes here are
// well below the int32 page-size limit, so these benchmarks measure the
// overhead introduced for the common (non-oversized) case.
#define PARQUET_BENCHMARKS_NULLS_NO_DICT(_type_, _name_, _null_) \
BENCHMARK_NAMED_PARAM( \
run, \
_name_##_batch_4k_no_dict_null##_null_, \
#_name_, \
_type_, \
_null_, \
4096, \
true); \
BENCHMARK_NAMED_PARAM( \
run, \
_name_##_batch_32k_no_dict_null##_null_, \
#_name_, \
_type_, \
_null_, \
32768, \
true); \
BENCHMARK_NAMED_PARAM( \
run, \
_name_##_batch_256k_no_dict_null##_null_, \
#_name_, \
_type_, \
_null_, \
262144, \
true); \
BENCHMARK_DRAW_LINE();

#define PARQUET_BENCHMARKS_NO_DICT(_type_, _name_) \
PARQUET_BENCHMARKS_NULLS_NO_DICT(_type_, _name_, 0) \
PARQUET_BENCHMARKS_NULLS_NO_DICT(_type_, _name_, 20)

PARQUET_BENCHMARKS(VARCHAR(), Varchar);
PARQUET_BENCHMARKS(BIGINT(), BigInt);
PARQUET_BENCHMARKS(DOUBLE(), Double);
Expand All @@ -139,6 +179,12 @@ PARQUET_BENCHMARKS(DECIMAL(38, 3), LongDecimalType);
PARQUET_BENCHMARKS(MAP(BIGINT(), BIGINT()), Map);
PARQUET_BENCHMARKS(ARRAY(BIGINT()), List);

// Plain-encoded VARCHAR exercises the new BYTE_ARRAY page-splitting code path
// added by the oversized-page fix. The nested ARRAY<VARCHAR> case additionally
// drives the per-level loop branch (def_levels != nullptr && def_level > 0).
PARQUET_BENCHMARKS_NO_DICT(VARCHAR(), VarcharPlain);
PARQUET_BENCHMARKS_NO_DICT(ARRAY(VARCHAR()), VarcharListPlain);

// TODO: Add all data types

int main(int argc, char** argv) {
Expand Down
Loading
Loading