Skip to content

Commit 65310aa

Browse files
committed
update
1 parent cc15bf1 commit 65310aa

6 files changed

Lines changed: 234 additions & 19 deletions

File tree

cpp/src/parquet/bloom_filter.cc

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "parquet/bloom_filter.h"
3131
#include "parquet/encryption/encryption_internal.h"
3232
#include "parquet/encryption/internal_file_decryptor.h"
33+
#include "parquet/encryption/internal_file_encryptor.h"
3334
#include "parquet/exception.h"
3435
#include "parquet/thrift_internal.h"
3536
#include "parquet/xxhasher.h"
@@ -320,31 +321,73 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
320321
return bloom_filter;
321322
}
322323

323-
void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
324-
DCHECK(sink != nullptr);
324+
namespace {
325325

326+
format::BloomFilterHeader BuildBloomFilterHeader(
327+
BloomFilter::Algorithm algorithm, BloomFilter::HashStrategy hash_strategy,
328+
BloomFilter::CompressionStrategy compression_strategy, uint32_t num_bytes) {
326329
format::BloomFilterHeader header;
327-
if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
330+
if (ARROW_PREDICT_FALSE(algorithm != BloomFilter::Algorithm::BLOCK)) {
328331
throw ParquetException("BloomFilter does not support Algorithm other than BLOCK");
329332
}
330333
header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
331-
if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
334+
if (ARROW_PREDICT_FALSE(hash_strategy != BloomFilter::HashStrategy::XXHASH)) {
332335
throw ParquetException("BloomFilter does not support Hash other than XXHASH");
333336
}
334337
header.hash.__set_XXHASH(format::XxHash());
335-
if (ARROW_PREDICT_FALSE(compression_strategy_ != CompressionStrategy::UNCOMPRESSED)) {
338+
if (ARROW_PREDICT_FALSE(compression_strategy !=
339+
BloomFilter::CompressionStrategy::UNCOMPRESSED)) {
336340
throw ParquetException(
337341
"BloomFilter does not support Compression other than UNCOMPRESSED");
338342
}
339343
header.compression.__set_UNCOMPRESSED(format::Uncompressed());
340-
header.__set_numBytes(num_bytes_);
344+
header.__set_numBytes(num_bytes);
345+
return header;
346+
}
347+
348+
} // namespace
349+
350+
void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
351+
DCHECK(sink != nullptr);
352+
353+
format::BloomFilterHeader header = BuildBloomFilterHeader(
354+
algorithm_, hash_strategy_, compression_strategy_, num_bytes_);
341355

342356
ThriftSerializer serializer;
343357
serializer.Serialize(&header, sink);
344358

345359
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
346360
}
347361

362+
void BlockSplitBloomFilter::WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor,
363+
int16_t row_group_ordinal,
364+
int16_t column_ordinal) const {
365+
DCHECK(sink != nullptr);
366+
if (encryptor == nullptr) {
367+
throw ParquetException("Bloom filter encryptor must be provided");
368+
}
369+
370+
format::BloomFilterHeader header = BuildBloomFilterHeader(
371+
algorithm_, hash_strategy_, compression_strategy_, num_bytes_);
372+
373+
// Bloom filter header and bitset are separate encrypted modules with different AADs.
374+
encryptor->UpdateAad(
375+
encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterHeader,
376+
row_group_ordinal, column_ordinal, -1));
377+
ThriftSerializer serializer;
378+
serializer.Serialize(&header, sink, encryptor);
379+
380+
encryptor->UpdateAad(
381+
encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterBitset,
382+
row_group_ordinal, column_ordinal, -1));
383+
auto cipher_buffer =
384+
AllocateBuffer(encryptor->pool(), encryptor->CiphertextLength(num_bytes_));
385+
std::span<const uint8_t> bitset_span(data_->data(), num_bytes_);
386+
int32_t cipher_buffer_len =
387+
encryptor->Encrypt(bitset_span, cipher_buffer->mutable_span_as<uint8_t>());
388+
PARQUET_THROW_NOT_OK(sink->Write(cipher_buffer->data(), cipher_buffer_len));
389+
}
390+
348391
bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
349392
const uint32_t bucket_index =
350393
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);

cpp/src/parquet/bloom_filter.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,20 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
276276
void InsertHash(uint64_t hash) override;
277277
void InsertHashes(const uint64_t* hashes, int num_values) override;
278278
void WriteTo(ArrowOutputStream* sink) const override;
279+
280+
/// Serialize this Bloom filter as two encrypted modules (header and bitset)
281+
/// using the supplied metadata encryptor.
282+
///
283+
/// The same encryptor is used for both modules, switching the AAD between
284+
/// kBloomFilterHeader and kBloomFilterBitset before each encryption.
285+
///
286+
/// @param sink The output stream to write to.
287+
/// @param encryptor Metadata encryptor for this column. Must not be null.
288+
/// @param row_group_ordinal Ordinal of the row group containing this Bloom filter.
289+
/// @param column_ordinal Ordinal of the column containing this Bloom filter.
290+
void WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor,
291+
int16_t row_group_ordinal, int16_t column_ordinal) const;
292+
279293
uint32_t GetBitsetSize() const override { return num_bytes_; }
280294

281295
uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }

cpp/src/parquet/bloom_filter_writer.cc

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "arrow/util/bit_run_reader.h"
2727
#include "arrow/util/checked_cast.h"
2828

29+
#include "parquet/encryption/internal_file_encryptor.h"
2930
#include "parquet/exception.h"
3031
#include "parquet/metadata.h"
3132
#include "parquet/properties.h"
@@ -151,12 +152,15 @@ namespace {
151152

152153
/// \brief A concrete implementation of BloomFilterBuilder.
153154
///
154-
/// \note Column encryption for bloom filter is not implemented yet.
155+
/// When `file_encryptor` is provided, bloom filters of encrypted columns are
156+
/// serialized using the column's metadata encryptor, bloom filters of
157+
/// unencrypted columns are serialized in plaintext.
155158
class BloomFilterBuilderImpl : public BloomFilterBuilder {
156159
public:
157160
BloomFilterBuilderImpl(const SchemaDescriptor* schema,
158-
const WriterProperties* properties)
159-
: schema_(schema), properties_(properties) {}
161+
const WriterProperties* properties,
162+
InternalFileEncryptor* file_encryptor)
163+
: schema_(schema), properties_(properties), file_encryptor_(file_encryptor) {}
160164

161165
void AppendRowGroup() override;
162166

@@ -183,6 +187,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
183187

184188
const SchemaDescriptor* schema_;
185189
const WriterProperties* properties_;
190+
InternalFileEncryptor* file_encryptor_;
186191
bool finished_ = false;
187192

188193
using RowGroupBloomFilters =
@@ -225,14 +230,42 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
225230
}
226231
finished_ = true;
227232

233+
// Bloom filter ordinals are encoded as int16 in the AAD when encryption is enabled.
234+
constexpr size_t kEncryptedOrdinalLimit = std::numeric_limits<int16_t>::max(); // 32767
235+
228236
IndexLocations locations;
229237

230238
for (size_t i = 0; i != bloom_filters_.size(); ++i) {
231239
auto& row_group_bloom_filters = bloom_filters_[i];
232240
for (const auto& [column_id, filter] : row_group_bloom_filters) {
233241
// TODO(GH-43138): Determine the quality of bloom filter before writing it.
234242
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
235-
filter->WriteTo(sink);
243+
244+
const auto column_path = schema_->Column(column_id)->path()->ToDotString();
245+
std::shared_ptr<Encryptor> meta_encryptor =
246+
file_encryptor_ != nullptr
247+
? file_encryptor_->GetColumnMetaEncryptor(column_path)
248+
: nullptr;
249+
if (meta_encryptor != nullptr) {
250+
if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) {
251+
throw ParquetException(
252+
"Encrypted files cannot contain more than 32767 row groups");
253+
}
254+
if (ARROW_PREDICT_FALSE(static_cast<size_t>(column_id) >
255+
kEncryptedOrdinalLimit)) {
256+
throw ParquetException(
257+
"Encrypted files cannot contain more than 32767 columns");
258+
}
259+
auto* block_filter = dynamic_cast<BlockSplitBloomFilter*>(filter.get());
260+
if (block_filter == nullptr) {
261+
throw ParquetException(
262+
"Only BlockSplitBloomFilter is supported for encrypted bloom filters");
263+
}
264+
block_filter->WriteEncrypted(sink, meta_encryptor.get(), static_cast<int16_t>(i),
265+
static_cast<int16_t>(column_id));
266+
} else {
267+
filter->WriteTo(sink);
268+
}
236269
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
237270

238271
if (pos - offset > std::numeric_limits<int32_t>::max()) {
@@ -253,8 +286,9 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
253286
} // namespace
254287

255288
std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
256-
const SchemaDescriptor* schema, const WriterProperties* properties) {
257-
return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
289+
const SchemaDescriptor* schema, const WriterProperties* properties,
290+
InternalFileEncryptor* file_encryptor) {
291+
return std::make_unique<BloomFilterBuilderImpl>(schema, properties, file_encryptor);
258292
}
259293

260294
} // namespace parquet

cpp/src/parquet/bloom_filter_writer.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "arrow/type_fwd.h"
2121

2222
#include "parquet/bloom_filter.h"
23+
#include "parquet/encryption/type_fwd.h"
2324
#include "parquet/index_location.h"
2425
#include "parquet/type_fwd.h"
2526

@@ -71,8 +72,12 @@ class PARQUET_EXPORT BloomFilterBuilder {
7172
/// \param schema The schema of the file and it must outlive the created builder.
7273
/// \param properties Properties to get bloom filter options. It must outlive the
7374
/// created builder.
74-
static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema,
75-
const WriterProperties* properties);
75+
/// \param file_encryptor File level encryptor used to encrypt bloom filters of
76+
/// encrypted columns. May be null for unencrypted files. Must outlive the created
77+
/// builder.
78+
static std::unique_ptr<BloomFilterBuilder> Make(
79+
const SchemaDescriptor* schema, const WriterProperties* properties,
80+
InternalFileEncryptor* file_encryptor = NULLPTR);
7681

7782
/// \brief Start a new row group to write bloom filters, meaning that next calls
7883
/// to `CreateBloomFilter` will create bloom filters for the new row group.

cpp/src/parquet/encryption/bloom_filter_encryption_test.cc

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
#include <string>
2222

2323
#include "arrow/io/file.h"
24+
#include "arrow/io/memory.h"
2425

2526
#include "parquet/bloom_filter.h"
2627
#include "parquet/bloom_filter_reader.h"
2728
#include "parquet/encryption/test_encryption_util.h"
2829
#include "parquet/file_reader.h"
30+
#include "parquet/file_writer.h"
2931
#include "parquet/properties.h"
32+
#include "parquet/schema.h"
3033

3134
namespace parquet::encryption::test {
3235
namespace {
@@ -91,4 +94,120 @@ TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) {
9194
}
9295
}
9396

97+
namespace {
98+
99+
std::shared_ptr<schema::GroupNode> SingleInt64Schema(const std::string& field_name) {
100+
auto field = schema::PrimitiveNode::Make(field_name, Repetition::REQUIRED, Type::INT64,
101+
ConvertedType::NONE);
102+
return std::static_pointer_cast<schema::GroupNode>(
103+
schema::GroupNode::Make("schema", Repetition::REQUIRED, {field}));
104+
}
105+
106+
std::shared_ptr<FileEncryptionProperties> BuildEncryptionProperties(
107+
const std::string& field_name) {
108+
auto col_props =
109+
ColumnEncryptionProperties::Builder().key(kColumnEncryptionKey1)->build();
110+
ColumnPathToEncryptionPropertiesMap encrypted_columns{{field_name, col_props}};
111+
FileEncryptionProperties::Builder builder(kFooterEncryptionKey);
112+
return builder.encrypted_columns(std::move(encrypted_columns))->build();
113+
}
114+
115+
std::shared_ptr<FileDecryptionProperties> BuildDecryptionPropertiesWithExplicitKeys(
116+
const std::string& field_name) {
117+
auto col_props =
118+
ColumnDecryptionProperties::Builder(field_name).key(kColumnEncryptionKey1)->build();
119+
ColumnPathToDecryptionPropertiesMap decrypted_columns{{field_name, col_props}};
120+
FileDecryptionProperties::Builder builder;
121+
return builder.footer_key(kFooterEncryptionKey)
122+
->column_keys(std::move(decrypted_columns))
123+
->build();
124+
}
125+
126+
} // namespace
127+
128+
// Round trip, write a small encrypted file with a Bloom filter on the encrypted
129+
// column, then read it back and verify the Bloom filter contains the inserted
130+
// values and rejects values that were never inserted.
131+
TEST(EncryptedBloomFilterWriter, RoundTripEncryptedBloomFilter) {
132+
const std::string field_name = "id";
133+
constexpr int kNumValues = 64;
134+
135+
auto schema = SingleInt64Schema(field_name);
136+
137+
WriterProperties::Builder prop_builder;
138+
prop_builder.compression(Compression::UNCOMPRESSED);
139+
prop_builder.enable_bloom_filter(field_name, {});
140+
prop_builder.encryption(BuildEncryptionProperties(field_name));
141+
auto writer_properties = prop_builder.build();
142+
143+
PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create());
144+
auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
145+
auto* row_group_writer = file_writer->AppendRowGroup();
146+
auto* int64_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
147+
std::vector<int64_t> values(kNumValues);
148+
for (int i = 0; i < kNumValues; ++i) {
149+
values[i] = static_cast<int64_t>(i) * 7 + 13;
150+
}
151+
int64_writer->WriteBatch(static_cast<int64_t>(values.size()), nullptr, nullptr,
152+
values.data());
153+
file_writer->Close();
154+
PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
155+
156+
ReaderProperties reader_properties = default_reader_properties();
157+
reader_properties.file_decryption_properties(
158+
BuildDecryptionPropertiesWithExplicitKeys(field_name));
159+
160+
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
161+
auto file_reader = ParquetFileReader::Open(source, reader_properties);
162+
auto& bloom_filter_reader = file_reader->GetBloomFilterReader();
163+
auto row_group_0 = bloom_filter_reader.RowGroup(0);
164+
ASSERT_NE(nullptr, row_group_0);
165+
auto filter = row_group_0->GetColumnBloomFilter(0);
166+
ASSERT_NE(nullptr, filter);
167+
168+
for (int64_t value : values) {
169+
EXPECT_TRUE(filter->FindHash(filter->Hash(value)))
170+
<< "missing inserted value " << value;
171+
}
172+
173+
for (int64_t miss : {int64_t{-1}, int64_t{1'000'000}, int64_t{1'000'001}}) {
174+
EXPECT_FALSE(filter->FindHash(filter->Hash(miss)))
175+
<< "unexpected hit for non-inserted value " << miss;
176+
}
177+
}
178+
179+
// Reading the encrypted Bloom filter without the column key must fail, this
180+
// guards against the bytes accidentally being written in plaintext.
181+
TEST(EncryptedBloomFilterWriter, ReadingWithoutKeyFails) {
182+
const std::string field_name = "id";
183+
auto schema = SingleInt64Schema(field_name);
184+
185+
WriterProperties::Builder prop_builder;
186+
prop_builder.compression(Compression::UNCOMPRESSED);
187+
prop_builder.enable_bloom_filter(field_name, {});
188+
prop_builder.encryption(BuildEncryptionProperties(field_name));
189+
auto writer_properties = prop_builder.build();
190+
191+
PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create());
192+
auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
193+
auto* row_group_writer = file_writer->AppendRowGroup();
194+
auto* int64_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
195+
int64_t value = 42;
196+
int64_writer->WriteBatch(1, nullptr, nullptr, &value);
197+
file_writer->Close();
198+
PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
199+
200+
ReaderProperties reader_properties = default_reader_properties();
201+
FileDecryptionProperties::Builder dec_builder;
202+
reader_properties.file_decryption_properties(
203+
dec_builder.footer_key(kFooterEncryptionKey)->build());
204+
205+
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
206+
auto file_reader = ParquetFileReader::Open(source, reader_properties);
207+
auto& bloom_filter_reader = file_reader->GetBloomFilterReader();
208+
auto row_group_0 = bloom_filter_reader.RowGroup(0);
209+
ASSERT_NE(nullptr, row_group_0);
210+
EXPECT_THROW(row_group_0->GetColumnBloomFilter(0), ParquetException);
211+
}
212+
94213
} // namespace parquet::encryption::test

cpp/src/parquet/file_writer.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,11 +503,10 @@ class FileSerializer : public ParquetFileWriter::Contents {
503503

504504
void WriteBloomFilter() {
505505
if (bloom_filter_builder_ != nullptr) {
506-
if (properties_->file_encryption_properties()) {
507-
ParquetException::NYI("Encryption is not currently supported with bloom filter");
508-
}
509506
// Serialize bloom filter after all row groups have been written and report
510-
// location to the file metadata.
507+
// location to the file metadata. Bloom filters of encrypted columns are
508+
// encrypted using each column's metadata encryptor (the builder was
509+
// constructed with the file-level encryptor when encryption was enabled).
511510
auto locations = bloom_filter_builder_->WriteTo(sink_.get());
512511
metadata_->SetIndexLocations(IndexKind::kBloomFilter, locations);
513512
}
@@ -575,7 +574,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
575574
}
576575
}
577576
if (properties_->bloom_filter_enabled()) {
578-
bloom_filter_builder_ = BloomFilterBuilder::Make(schema(), properties_.get());
577+
bloom_filter_builder_ =
578+
BloomFilterBuilder::Make(schema(), properties_.get(), file_encryptor_.get());
579579
}
580580
if (properties_->page_index_enabled()) {
581581
page_index_builder_ = PageIndexBuilder::Make(&schema_, file_encryptor_.get());

0 commit comments

Comments
 (0)